|
@@ -67,6 +67,7 @@ import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
|
|
|
import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
|
|
|
import org.apache.hadoop.mapred.TaskLog.LogName;
|
|
|
import org.apache.hadoop.mapred.TaskStatus.Phase;
|
|
@@ -219,13 +220,19 @@ public class TaskTracker
|
|
|
//for serving map output to the other nodes
|
|
|
|
|
|
static Random r = new Random();
|
|
|
- private static final String SUBDIR = "taskTracker";
|
|
|
- private static final String CACHEDIR = "archive";
|
|
|
- private static final String JOBCACHE = "jobcache";
|
|
|
- private static final String OUTPUT = "output";
|
|
|
+ static final String SUBDIR = "taskTracker";
|
|
|
+ private static final String DISTCACHEDIR = "distcache";
|
|
|
+ static final String JOBCACHE = "jobcache";
|
|
|
+ static final String OUTPUT = "output";
|
|
|
+ private static final String JARSDIR = "jars";
|
|
|
+ static final String LOCAL_SPLIT_FILE = "split.info";
|
|
|
+ static final String JOBFILE = "job.xml";
|
|
|
+
|
|
|
+ static final String JOB_LOCAL_DIR = "job.local.dir";
|
|
|
static final String JOB_TOKEN_FILE="jobToken"; //localized file
|
|
|
- private JobConf originalConf;
|
|
|
+
|
|
|
private JobConf fConf;
|
|
|
+ private JobConf originalConf;
|
|
|
private int maxMapSlots;
|
|
|
private int maxReduceSlots;
|
|
|
private int failures;
|
|
@@ -435,8 +442,8 @@ public class TaskTracker
|
|
|
return TaskTracker.SUBDIR + Path.SEPARATOR + user;
|
|
|
}
|
|
|
|
|
|
- static String getCacheSubdir() {
|
|
|
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
|
|
|
+ static String getDistributedCacheDir() {
|
|
|
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
|
|
|
}
|
|
|
|
|
|
static String getJobCacheSubdir() {
|
|
@@ -449,31 +456,66 @@ public class TaskTracker
|
|
|
}
|
|
|
|
|
|
static String getLocalJobDir(String jobid) {
|
|
|
- return getJobCacheSubdir() + Path.SEPARATOR + jobid;
|
|
|
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
|
|
|
}
|
|
|
|
|
|
- static String getLocalTaskDir(String jobid, String taskid) {
|
|
|
- return getLocalTaskDir(jobid, taskid, false) ;
|
|
|
+ static String getLocalJobConfFile(String jobid) {
|
|
|
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
|
|
|
}
|
|
|
|
|
|
- static String getIntermediateOutputDir(String jobid, String taskid) {
|
|
|
- return getLocalTaskDir(jobid, taskid)
|
|
|
- + Path.SEPARATOR + TaskTracker.OUTPUT ;
|
|
|
+ static String getTaskConfFile(String jobid, String taskid,
|
|
|
+ boolean isCleanupAttempt) {
|
|
|
+ return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
|
|
|
+ + TaskTracker.JOBFILE;
|
|
|
}
|
|
|
-
|
|
|
- static String getLocalJobTokenFile(String user, String jobid) {
|
|
|
- return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
|
|
|
+
|
|
|
+ static String getJobJarsDir(String jobid) {
|
|
|
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getJobJarFile(String jobid) {
|
|
|
+ return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
|
|
|
}
|
|
|
|
|
|
+ static String getJobWorkDir(String jobid) {
|
|
|
+ return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getLocalSplitFile(String jobid, String taskid) {
|
|
|
+ return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
|
|
|
+ + TaskTracker.LOCAL_SPLIT_FILE;
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getIntermediateOutputDir(String jobid, String taskid) {
|
|
|
+ return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
|
|
|
+ + TaskTracker.OUTPUT;
|
|
|
+ }
|
|
|
|
|
|
- static String getLocalTaskDir(String jobid,
|
|
|
- String taskid,
|
|
|
- boolean isCleanupAttempt) {
|
|
|
- String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
|
|
|
- if (isCleanupAttempt) {
|
|
|
+ static String getLocalTaskDir(String jobid, String taskid) {
|
|
|
+ return getLocalTaskDir(jobid, taskid, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getLocalTaskDir(String jobid, String taskid,
|
|
|
+ boolean isCleanupAttempt) {
|
|
|
+ String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
|
|
|
+ if (isCleanupAttempt) {
|
|
|
taskDir = taskDir + TASK_CLEANUP_SUFFIX;
|
|
|
- }
|
|
|
- return taskDir;
|
|
|
+ }
|
|
|
+ return taskDir;
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getTaskWorkDir(String jobid, String taskid,
|
|
|
+ boolean isCleanupAttempt) {
|
|
|
+ String dir =
|
|
|
+ getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
|
|
|
+ if (isCleanupAttempt) {
|
|
|
+ dir = dir + TASK_CLEANUP_SUFFIX;
|
|
|
+ }
|
|
|
+ return dir + Path.SEPARATOR + MRConstants.WORKDIR;
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getLocalJobTokenFile(String user, String jobid) {
|
|
|
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
|
|
|
}
|
|
|
|
|
|
private void setUgi(String user, Configuration conf) {
|
|
@@ -841,92 +883,25 @@ public class TaskTracker
|
|
|
Path localJarFile = null;
|
|
|
Task t = tip.getTask();
|
|
|
JobID jobId = t.getJobID();
|
|
|
- Path jobFile = new Path(t.getJobFile());
|
|
|
- String userName = t.getUser();
|
|
|
- JobConf userConf = new JobConf(getJobConf());
|
|
|
- setUgi(userName, userConf);
|
|
|
- FileSystem userFs = jobFile.getFileSystem(userConf);
|
|
|
- // Get sizes of JobFile and JarFile
|
|
|
- // sizes are -1 if they are not present.
|
|
|
- FileStatus status = null;
|
|
|
- long jobFileSize = -1;
|
|
|
- try {
|
|
|
- status = userFs.getFileStatus(jobFile);
|
|
|
- jobFileSize = status.getLen();
|
|
|
- } catch(FileNotFoundException fe) {
|
|
|
- jobFileSize = -1;
|
|
|
- }
|
|
|
- Path localJobFile = lDirAlloc.getLocalPathForWrite(
|
|
|
- getLocalJobDir(jobId.toString())
|
|
|
- + Path.SEPARATOR + "job.xml",
|
|
|
- jobFileSize, fConf);
|
|
|
+
|
|
|
RunningJob rjob = addTaskToJob(jobId, tip);
|
|
|
synchronized (rjob) {
|
|
|
if (!rjob.localized) {
|
|
|
-
|
|
|
- FileSystem localFs = FileSystem.getLocal(fConf);
|
|
|
- // this will happen on a partial execution of localizeJob.
|
|
|
- // Sometimes the job.xml gets copied but copying job.jar
|
|
|
- // might throw out an exception
|
|
|
- // we should clean up and then try again
|
|
|
- Path jobDir = localJobFile.getParent();
|
|
|
- if (localFs.exists(jobDir)){
|
|
|
- localFs.delete(jobDir, true);
|
|
|
- boolean b = localFs.mkdirs(jobDir);
|
|
|
- if (!b)
|
|
|
- throw new IOException("Not able to create job directory "
|
|
|
- + jobDir.toString());
|
|
|
- }
|
|
|
- userFs.copyToLocalFile(jobFile, localJobFile);
|
|
|
- JobConf localJobConf = new JobConf(localJobFile);
|
|
|
+ JobConf localJobConf = localizeJobFiles(t);
|
|
|
|
|
|
- // create the 'work' directory
|
|
|
- // job-specific shared directory for use as scratch space
|
|
|
- Path workDir = lDirAlloc.getLocalPathForWrite(
|
|
|
- (getLocalJobDir(jobId.toString())
|
|
|
- + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
|
|
|
- if (!localFs.mkdirs(workDir)) {
|
|
|
- throw new IOException("Mkdirs failed to create "
|
|
|
- + workDir.toString());
|
|
|
- }
|
|
|
- System.setProperty("job.local.dir", workDir.toString());
|
|
|
- localJobConf.set("job.local.dir", workDir.toString());
|
|
|
+ // Now initialize the job via task-controller so as to set
|
|
|
+ // ownership/permissions of jars, job-work-dir. Note that initializeJob
|
|
|
+ // should be the last call after every other directory/file to be
|
|
|
+ // directly under the job directory is created.
|
|
|
+ JobInitializationContext context = new JobInitializationContext();
|
|
|
+ context.jobid = jobId;
|
|
|
+ context.user = localJobConf.getUser();
|
|
|
+ context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
|
|
|
+ taskController.initializeJob(context);
|
|
|
|
|
|
- // copy Jar file to the local FS and unjar it.
|
|
|
- String jarFile = localJobConf.getJar();
|
|
|
- long jarFileSize = -1;
|
|
|
- if (jarFile != null) {
|
|
|
- Path jarFilePath = new Path(jarFile);
|
|
|
- try {
|
|
|
- status = userFs.getFileStatus(jarFilePath);
|
|
|
- jarFileSize = status.getLen();
|
|
|
- } catch(FileNotFoundException fe) {
|
|
|
- jarFileSize = -1;
|
|
|
- }
|
|
|
- // Here we check for and we check five times the size of jarFileSize
|
|
|
- // to accommodate for unjarring the jar file in work directory
|
|
|
- localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
|
|
|
- getLocalJobDir(jobId.toString())
|
|
|
- + Path.SEPARATOR + "jars",
|
|
|
- 5 * jarFileSize, fConf), "job.jar");
|
|
|
- if (!localFs.mkdirs(localJarFile.getParent())) {
|
|
|
- throw new IOException("Mkdirs failed to create jars directory ");
|
|
|
- }
|
|
|
- userFs.copyToLocalFile(jarFilePath, localJarFile);
|
|
|
- localJobConf.setJar(localJarFile.toString());
|
|
|
- OutputStream out = localFs.create(localJobFile);
|
|
|
- try {
|
|
|
- localJobConf.writeXml(out);
|
|
|
- } finally {
|
|
|
- out.close();
|
|
|
- }
|
|
|
- // also unjar the job.jar files
|
|
|
- RunJar.unJar(new File(localJarFile.toString()),
|
|
|
- new File(localJarFile.getParent().toString()));
|
|
|
- }
|
|
|
+ rjob.jobConf = localJobConf;
|
|
|
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
|
|
|
localJobConf.getKeepFailedTaskFiles());
|
|
|
- rjob.jobConf = localJobConf;
|
|
|
// save local copy of JobToken file
|
|
|
localizeJobTokenFile(t.getUser(), jobId, localJobConf);
|
|
|
FSDataInputStream in = localFs.open(new Path(
|
|
@@ -936,13 +911,319 @@ public class TaskTracker
|
|
|
getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
|
|
|
|
|
|
rjob.localized = true;
|
|
|
- taskController.initializeJob(jobId);
|
|
|
}
|
|
|
}
|
|
|
launchTaskForJob(tip, new JobConf(rjob.jobConf));
|
|
|
}
|
|
|
|
|
|
- private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
|
|
|
+ /**
|
|
|
+ * Localize the job on this tasktracker. Specifically
|
|
|
+ * <ul>
|
|
|
+ * <li>Cleanup and create job directories on all disks</li>
|
|
|
+ * <li>Download the job config file job.xml from the FS</li>
|
|
|
+ * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
|
|
|
+ * in the configuration.
|
|
|
+ * <li>Download the job jar file job.jar from the FS, unjar it and set jar
|
|
|
+ * file in the configuration.</li>
|
|
|
+ * </ul>
|
|
|
+ *
|
|
|
+ * @param t task whose job has to be localized on this TT
|
|
|
+ * @return the modified job configuration to be used for all the tasks of this
|
|
|
+ * job as a starting point.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ JobConf localizeJobFiles(Task t)
|
|
|
+ throws IOException {
|
|
|
+ JobID jobId = t.getJobID();
|
|
|
+
|
|
|
+ Path jobFile = new Path(t.getJobFile());
|
|
|
+ String userName = t.getUser();
|
|
|
+ JobConf userConf = new JobConf(getJobConf());
|
|
|
+ setUgi(userName, userConf);
|
|
|
+ FileSystem userFs = jobFile.getFileSystem(userConf);
|
|
|
+
|
|
|
+ // Initialize the job directories first
|
|
|
+ FileSystem localFs = FileSystem.getLocal(fConf);
|
|
|
+ initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
|
|
|
+
|
|
|
+ // Download the job.xml for this job from the system FS
|
|
|
+ Path localJobFile =
|
|
|
+ localizeJobConfFile(new Path(t.getJobFile()), userFs, jobId);
|
|
|
+
|
|
|
+ JobConf localJobConf = new JobConf(localJobFile);
|
|
|
+
|
|
|
+ // create the 'job-work' directory: job-specific shared directory for use as
|
|
|
+ // scratch space by all tasks of the same job running on this TaskTracker.
|
|
|
+ Path workDir =
|
|
|
+ lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
|
|
|
+ fConf);
|
|
|
+ if (!localFs.mkdirs(workDir)) {
|
|
|
+ throw new IOException("Mkdirs failed to create "
|
|
|
+ + workDir.toString());
|
|
|
+ }
|
|
|
+ System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
|
|
|
+ localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
|
|
|
+
|
|
|
+ // Download the job.jar for this job from the system FS
|
|
|
+ localizeJobJarFile(jobId, userFs, localJobConf);
|
|
|
+
|
|
|
+ return localJobConf;
|
|
|
+ }
|
|
|
+
|
|
|
+ static class PermissionsHandler {
|
|
|
+ /**
|
|
|
+ * Permission information useful for setting permissions for a given path.
|
|
|
+ * Using this, one can set all possible combinations of permissions for the
|
|
|
+ * owner of the file. But permissions for the group and all others can only
|
|
|
+ * be set together, i.e. permissions for group cannot be set different from
|
|
|
+ * those for others and vice versa.
|
|
|
+ */
|
|
|
+ static class PermissionsInfo {
|
|
|
+ public boolean readPermissions;
|
|
|
+ public boolean writePermissions;
|
|
|
+ public boolean executablePermissions;
|
|
|
+ public boolean readPermsOwnerOnly;
|
|
|
+ public boolean writePermsOwnerOnly;
|
|
|
+ public boolean executePermsOwnerOnly;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a permissions-info object with the given attributes
|
|
|
+ *
|
|
|
+ * @param readPerms
|
|
|
+ * @param writePerms
|
|
|
+ * @param executePerms
|
|
|
+ * @param readOwnerOnly
|
|
|
+ * @param writeOwnerOnly
|
|
|
+ * @param executeOwnerOnly
|
|
|
+ */
|
|
|
+ public PermissionsInfo(boolean readPerms, boolean writePerms,
|
|
|
+ boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
|
|
|
+ boolean executeOwnerOnly) {
|
|
|
+ readPermissions = readPerms;
|
|
|
+ writePermissions = writePerms;
|
|
|
+ executablePermissions = executePerms;
|
|
|
+ readPermsOwnerOnly = readOwnerOnly;
|
|
|
+ writePermsOwnerOnly = writeOwnerOnly;
|
|
|
+ executePermsOwnerOnly = executeOwnerOnly;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set permission on the given file path using the specified permissions
|
|
|
+ * information. We use java api to set permission instead of spawning chmod
|
|
|
+ * processes. This saves a lot of time. Using this, one can set all possible
|
|
|
+ * combinations of permissions for the owner of the file. But permissions
|
|
|
+ * for the group and all others can only be set together, i.e. permissions
|
|
|
+ * for group cannot be set different from those for others and vice versa.
|
|
|
+ *
|
|
|
+ * This method should satisfy the needs of most of the applications. For
|
|
|
+ * those it doesn't, {@link FileUtil#chmod} can be used.
|
|
|
+ *
|
|
|
+ * @param f file path
|
|
|
+ * @param pInfo permissions information
|
|
|
+ * @return true if success, false otherwise
|
|
|
+ */
|
|
|
+ static boolean setPermissions(File f, PermissionsInfo pInfo) {
|
|
|
+ if (pInfo == null) {
|
|
|
+ LOG.debug(" PermissionsInfo is null, returning.");
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.debug("Setting permission for " + f.getAbsolutePath());
|
|
|
+
|
|
|
+ boolean ret = true;
|
|
|
+
|
|
|
+ // Clear all the flags
|
|
|
+ ret = f.setReadable(false, false) && ret;
|
|
|
+ ret = f.setWritable(false, false) && ret;
|
|
|
+ ret = f.setExecutable(false, false) && ret;
|
|
|
+
|
|
|
+ ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
|
|
|
+ LOG.debug("Readable status for " + f + " set to " + ret);
|
|
|
+ ret =
|
|
|
+ f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
|
|
|
+ && ret;
|
|
|
+ LOG.debug("Writable status for " + f + " set to " + ret);
|
|
|
+ ret =
|
|
|
+ f.setExecutable(pInfo.executablePermissions,
|
|
|
+ pInfo.executePermsOwnerOnly)
|
|
|
+ && ret;
|
|
|
+
|
|
|
+ LOG.debug("Executable status for " + f + " set to " + ret);
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Permissions rwxr_xr_x
|
|
|
+ */
|
|
|
+ static PermissionsInfo sevenFiveFive =
|
|
|
+ new PermissionsInfo(true, true, true, false, true, false);
|
|
|
+ /**
|
|
|
+ * Completely private permissions
|
|
|
+ */
|
|
|
+ static PermissionsInfo sevenZeroZero =
|
|
|
+ new PermissionsInfo(true, true, true, true, true, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Prepare the job directories for a given job. To be called by the job
|
|
|
+ * localization code, only if the job is not already localized.
|
|
|
+ *
|
|
|
+ * <br>
|
|
|
+ * Here, we set 700 permissions on the job directories created on all disks.
|
|
|
+ * This we do so as to avoid any misuse by other users till the time
|
|
|
+ * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
|
|
|
+ * later time to set proper private permissions on the job directories. <br>
|
|
|
+ *
|
|
|
+ * @param jobId
|
|
|
+ * @param fs
|
|
|
+ * @param localDirs
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private static void initializeJobDirs(JobID jobId, FileSystem fs,
|
|
|
+ String[] localDirs)
|
|
|
+ throws IOException {
|
|
|
+ boolean initJobDirStatus = false;
|
|
|
+ String jobDirPath = getLocalJobDir(jobId.toString());
|
|
|
+ for (String localDir : localDirs) {
|
|
|
+ Path jobDir = new Path(localDir, jobDirPath);
|
|
|
+ if (fs.exists(jobDir)) {
|
|
|
+ // this will happen on a partial execution of localizeJob. Sometimes
|
|
|
+ // copying job.xml to the local disk succeeds but copying job.jar might
|
|
|
+ // throw out an exception. We should clean up and then try again.
|
|
|
+ fs.delete(jobDir, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean jobDirStatus = fs.mkdirs(jobDir);
|
|
|
+ if (!jobDirStatus) {
|
|
|
+ LOG.warn("Not able to create job directory " + jobDir.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ initJobDirStatus = initJobDirStatus || jobDirStatus;
|
|
|
+
|
|
|
+ // job-dir has to be private to the TT
|
|
|
+ PermissionsHandler.setPermissions(new File(jobDir.toUri().getPath()),
|
|
|
+ PermissionsHandler.sevenZeroZero);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!initJobDirStatus) {
|
|
|
+ throw new IOException("Not able to initialize job directories "
|
|
|
+ + "in any of the configured local directories for job "
|
|
|
+ + jobId.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Download the job configuration file from the FS.
|
|
|
+ *
|
|
|
+ * @param t Task whose job file has to be downloaded
|
|
|
+ * @param jobId jobid of the task
|
|
|
+ * @return the local file system path of the downloaded file.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private Path localizeJobConfFile(Path jobFile, FileSystem userFs, JobID jobId)
|
|
|
+ throws IOException {
|
|
|
+ // Get sizes of JobFile and JarFile
|
|
|
+ // sizes are -1 if they are not present.
|
|
|
+ FileStatus status = null;
|
|
|
+ long jobFileSize = -1;
|
|
|
+ try {
|
|
|
+ status = userFs.getFileStatus(jobFile);
|
|
|
+ jobFileSize = status.getLen();
|
|
|
+ } catch(FileNotFoundException fe) {
|
|
|
+ jobFileSize = -1;
|
|
|
+ }
|
|
|
+ Path localJobFile =
|
|
|
+ lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
|
|
|
+ jobFileSize, fConf);
|
|
|
+
|
|
|
+ // Download job.xml
|
|
|
+ userFs.copyToLocalFile(jobFile, localJobFile);
|
|
|
+ return localJobFile;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Download the job jar file from FS to the local file system and unjar it.
|
|
|
+ * Set the local jar file in the passed configuration.
|
|
|
+ *
|
|
|
+ * @param jobId
|
|
|
+ * @param userFs
|
|
|
+ * @param localJobConf
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void localizeJobJarFile(JobID jobId, FileSystem userFs,
|
|
|
+ JobConf localJobConf)
|
|
|
+ throws IOException {
|
|
|
+ // copy Jar file to the local FS and unjar it.
|
|
|
+ String jarFile = localJobConf.getJar();
|
|
|
+ FileStatus status = null;
|
|
|
+ long jarFileSize = -1;
|
|
|
+ if (jarFile != null) {
|
|
|
+ Path jarFilePath = new Path(jarFile);
|
|
|
+ try {
|
|
|
+ status = userFs.getFileStatus(jarFilePath);
|
|
|
+ jarFileSize = status.getLen();
|
|
|
+ } catch (FileNotFoundException fe) {
|
|
|
+ jarFileSize = -1;
|
|
|
+ }
|
|
|
+ // Here we check for and we check five times the size of jarFileSize
|
|
|
+ // to accommodate for unjarring the jar file in userfiles directory
|
|
|
+ Path localJarFile =
|
|
|
+ lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
|
|
|
+ 5 * jarFileSize, fConf);
|
|
|
+
|
|
|
+ //Download job.jar
|
|
|
+ userFs.copyToLocalFile(jarFilePath, localJarFile);
|
|
|
+
|
|
|
+ localJobConf.setJar(localJarFile.toString());
|
|
|
+
|
|
|
+ // Also un-jar the job.jar files. We un-jar it so that classes inside
|
|
|
+ // sub-directories, for e.g., lib/, classes/ are available on class-path
|
|
|
+ RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
|
|
|
+ .getParent().toString()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create taskDirs on all the disks. Otherwise, in some cases, like when
|
|
|
+ * LinuxTaskController is in use, child might wish to balance load across
|
|
|
+ * disks but cannot itself create attempt directory because of the fact that
|
|
|
+ * job directory is writable only by the TT.
|
|
|
+ *
|
|
|
+ * @param jobId
|
|
|
+ * @param attemptId
|
|
|
+ * @param isCleanupAttempt
|
|
|
+ * @param fs
|
|
|
+ * @param localDirs
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private static void initializeAttemptDirs(String jobId, String attemptId,
|
|
|
+ boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ boolean initStatus = false;
|
|
|
+ String attemptDirPath =
|
|
|
+ getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
|
|
|
+
|
|
|
+ for (String localDir : localDirs) {
|
|
|
+ Path localAttemptDir = new Path(localDir, attemptDirPath);
|
|
|
+
|
|
|
+ boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
|
|
|
+ if (!attemptDirStatus) {
|
|
|
+ LOG.warn("localAttemptDir " + localAttemptDir.toString()
|
|
|
+ + " couldn't be created.");
|
|
|
+ }
|
|
|
+ initStatus = initStatus || attemptDirStatus;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!initStatus) {
|
|
|
+ throw new IOException("Not able to initialize attempt directories "
|
|
|
+ + "in any of the configured local directories for the attempt "
|
|
|
+ + attemptId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private void launchTaskForJob(TaskInProgress tip, JobConf jobConf)
|
|
|
+ throws IOException{
|
|
|
synchronized (tip) {
|
|
|
tip.setJobConf(jobConf);
|
|
|
tip.launchTask();
|
|
@@ -1020,6 +1301,17 @@ public class TaskTracker
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * For testing
|
|
|
+ */
|
|
|
+ TaskTracker() {
|
|
|
+ server = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setConf(JobConf conf) {
|
|
|
+ fConf = conf;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Start with the local machine name, and the default JobTracker
|
|
|
*/
|
|
@@ -1061,13 +1353,6 @@ public class TaskTracker
|
|
|
initialize();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Blank constructor. Only usable by tests.
|
|
|
- */
|
|
|
- TaskTracker() {
|
|
|
- server = null;
|
|
|
- }
|
|
|
-
|
|
|
private void checkJettyPort(int port) throws IOException {
|
|
|
//See HADOOP-4744
|
|
|
if (port < 0) {
|
|
@@ -1695,10 +1980,9 @@ public class TaskTracker
|
|
|
}
|
|
|
|
|
|
MapOutputFile mapOutputFile = new MapOutputFile();
|
|
|
- mapOutputFile.setJobId(taskId.getJobID());
|
|
|
mapOutputFile.setConf(conf);
|
|
|
|
|
|
- Path tmp_output = mapOutputFile.getOutputFile(taskId);
|
|
|
+ Path tmp_output = mapOutputFile.getOutputFile();
|
|
|
if(tmp_output == null)
|
|
|
return 0;
|
|
|
FileSystem localFS = FileSystem.getLocal(conf);
|
|
@@ -1988,54 +2272,36 @@ public class TaskTracker
|
|
|
taskTimeout = (10 * 60 * 1000);
|
|
|
}
|
|
|
|
|
|
- private void localizeTask(Task task) throws IOException{
|
|
|
+ void localizeTask(Task task) throws IOException{
|
|
|
|
|
|
- Path localTaskDir =
|
|
|
- lDirAlloc.getLocalPathForWrite(
|
|
|
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
|
|
|
- task.getTaskID().toString(), task.isTaskCleanupTask()),
|
|
|
- defaultJobConf );
|
|
|
-
|
|
|
FileSystem localFs = FileSystem.getLocal(fConf);
|
|
|
- if (!localFs.mkdirs(localTaskDir)) {
|
|
|
- throw new IOException("Mkdirs failed to create "
|
|
|
- + localTaskDir.toString());
|
|
|
- }
|
|
|
-
|
|
|
- // create symlink for ../work if it already doesnt exist
|
|
|
- String workDir = lDirAlloc.getLocalPathToRead(
|
|
|
- TaskTracker.getLocalJobDir(task.getJobID().toString())
|
|
|
- + Path.SEPARATOR
|
|
|
- + "work", defaultJobConf).toString();
|
|
|
- String link = localTaskDir.getParent().toString()
|
|
|
- + Path.SEPARATOR + "work";
|
|
|
- File flink = new File(link);
|
|
|
- if (!flink.exists())
|
|
|
- FileUtil.symLink(workDir, link);
|
|
|
-
|
|
|
+
|
|
|
+ // create taskDirs on all the disks.
|
|
|
+ initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
|
|
|
+ .toString(), task.isTaskCleanupTask(), localFs, fConf
|
|
|
+ .getStrings("mapred.local.dir"));
|
|
|
+
|
|
|
// create the working-directory of the task
|
|
|
- Path cwd = lDirAlloc.getLocalPathForWrite(
|
|
|
- getLocalTaskDir(task.getJobID().toString(),
|
|
|
- task.getTaskID().toString(), task.isTaskCleanupTask())
|
|
|
- + Path.SEPARATOR + MRConstants.WORKDIR,
|
|
|
- defaultJobConf);
|
|
|
+ Path cwd =
|
|
|
+ lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
|
|
|
+ .toString(), task.getTaskID().toString(), task
|
|
|
+ .isTaskCleanupTask()), defaultJobConf);
|
|
|
if (!localFs.mkdirs(cwd)) {
|
|
|
throw new IOException("Mkdirs failed to create "
|
|
|
+ cwd.toString());
|
|
|
}
|
|
|
|
|
|
- Path localTaskFile = new Path(localTaskDir, "job.xml");
|
|
|
- task.setJobFile(localTaskFile.toString());
|
|
|
localJobConf.set("mapred.local.dir",
|
|
|
fConf.get("mapred.local.dir"));
|
|
|
+
|
|
|
if (fConf.get("slave.host.name") != null) {
|
|
|
localJobConf.set("slave.host.name",
|
|
|
fConf.get("slave.host.name"));
|
|
|
}
|
|
|
|
|
|
- localJobConf.set("mapred.task.id", task.getTaskID().toString());
|
|
|
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
|
|
|
|
|
|
+ // Do the task-type specific localization
|
|
|
task.localizeConfiguration(localJobConf);
|
|
|
|
|
|
List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
|
|
@@ -2071,12 +2337,6 @@ public class TaskTracker
|
|
|
if (isTaskMemoryManagerEnabled()) {
|
|
|
localJobConf.setBoolean("task.memory.mgmt.enabled", true);
|
|
|
}
|
|
|
- OutputStream out = localFs.create(localTaskFile);
|
|
|
- try {
|
|
|
- localJobConf.writeXml(out);
|
|
|
- } finally {
|
|
|
- out.close();
|
|
|
- }
|
|
|
task.setConf(localJobConf);
|
|
|
}
|
|
|
|
|
@@ -2349,7 +2609,7 @@ public class TaskTracker
|
|
|
localJobConf). toString());
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Working Directory of the task " + task.getTaskID() +
|
|
|
- "doesnt exist. Caught exception " +
|
|
|
+ " doesnt exist. Caught exception " +
|
|
|
StringUtils.stringifyException(e));
|
|
|
}
|
|
|
// Build the command
|
|
@@ -2630,34 +2890,39 @@ public class TaskTracker
|
|
|
if (localJobConf == null) {
|
|
|
return;
|
|
|
}
|
|
|
- String taskDir = getLocalTaskDir(task.getJobID().toString(),
|
|
|
- taskId.toString(), task.isTaskCleanupTask());
|
|
|
+ String localTaskDir =
|
|
|
+ getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
|
|
|
+ task.isTaskCleanupTask());
|
|
|
+ String taskWorkDir =
|
|
|
+ getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
|
|
|
+ task.isTaskCleanupTask());
|
|
|
if (needCleanup) {
|
|
|
if (runner != null) {
|
|
|
//cleans up the output directory of the task (where map outputs
|
|
|
//and reduce inputs get stored)
|
|
|
runner.close();
|
|
|
}
|
|
|
- //We don't delete the workdir
|
|
|
- //since some other task (running in the same JVM)
|
|
|
- //might be using the dir. The JVM running the tasks would clean
|
|
|
- //the workdir per a task in the task process itself.
|
|
|
+
|
|
|
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
|
|
|
+ // No jvm reuse, remove everything
|
|
|
directoryCleanupThread.addToQueue(localFs,
|
|
|
getLocalFiles(defaultJobConf,
|
|
|
- taskDir));
|
|
|
+ localTaskDir));
|
|
|
}
|
|
|
-
|
|
|
else {
|
|
|
- directoryCleanupThread.addToQueue(localFs,
|
|
|
- getLocalFiles(defaultJobConf,
|
|
|
- taskDir+"/job.xml"));
|
|
|
+ // Jvm reuse. We don't delete the workdir since some other task
|
|
|
+ // (running in the same JVM) might be using the dir. The JVM
|
|
|
+ // running the tasks would clean the workdir per a task in the
|
|
|
+ // task process itself.
|
|
|
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
|
|
|
+ defaultJobConf, localTaskDir + Path.SEPARATOR
|
|
|
+ + TaskTracker.JOBFILE));
|
|
|
}
|
|
|
} else {
|
|
|
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
|
|
|
directoryCleanupThread.addToQueue(localFs,
|
|
|
getLocalFiles(defaultJobConf,
|
|
|
- taskDir+"/work"));
|
|
|
+ taskWorkDir));
|
|
|
}
|
|
|
}
|
|
|
} catch (Throwable ie) {
|