|
@@ -980,7 +980,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
* of this job as a starting point.
|
|
* of this job as a starting point.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- Path initializeJob(final Task t, RunningJob rjob)
|
|
|
|
|
|
+ Path initializeJob(final Task t, final RunningJob rjob)
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|
|
final JobID jobId = t.getJobID();
|
|
final JobID jobId = t.getJobID();
|
|
|
|
|
|
@@ -1006,47 +1006,42 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
// Download the job.xml for this job from the system FS
|
|
// Download the job.xml for this job from the system FS
|
|
final Path localJobFile =
|
|
final Path localJobFile =
|
|
localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
|
|
localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
|
|
- JobConf localJobConf = new JobConf(localJobFile);
|
|
|
|
-
|
|
|
|
- // Setup the public distributed cache
|
|
|
|
- TaskDistributedCacheManager taskDistributedCacheManager =
|
|
|
|
- getTrackerDistributedCacheManager()
|
|
|
|
- .newTaskDistributedCacheManager(jobId, localJobConf);
|
|
|
|
- rjob.distCacheMgr = taskDistributedCacheManager;
|
|
|
|
- taskDistributedCacheManager.setupCache(TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
- TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
-
|
|
|
|
- // Set some config values
|
|
|
|
- localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
|
|
|
|
- getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
|
|
|
|
- if (conf.get("slave.host.name") != null) {
|
|
|
|
- localJobConf.set("slave.host.name", conf.get("slave.host.name"));
|
|
|
|
- }
|
|
|
|
- resetNumTasksPerJvm(localJobConf);
|
|
|
|
- localJobConf.setUser(t.getUser());
|
|
|
|
-
|
|
|
|
- // write back the config (this config will have the updates that the
|
|
|
|
- // distributed cache manager makes as well)
|
|
|
|
- JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* Now initialize the job via task-controller to do the rest of the
|
|
* Now initialize the job via task-controller to do the rest of the
|
|
- * job-init. Do this within a doAs since the distributed cache is also set
|
|
|
|
- * up in {@link TaskController#initializeJob(String, JobID, Path, Path)}
|
|
|
|
|
|
+ * job-init. Do this within a doAs since the public distributed cache
|
|
|
|
+ * is also set up here.
|
|
* To support potential authenticated HDFS accesses, we need the tokens
|
|
* To support potential authenticated HDFS accesses, we need the tokens
|
|
*/
|
|
*/
|
|
rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
public Object run() throws IOException {
|
|
public Object run() throws IOException {
|
|
try {
|
|
try {
|
|
|
|
+ final JobConf localJobConf = new JobConf(localJobFile);
|
|
|
|
+ // Setup the public distributed cache
|
|
|
|
+ TaskDistributedCacheManager taskDistributedCacheManager =
|
|
|
|
+ getTrackerDistributedCacheManager()
|
|
|
|
+ .newTaskDistributedCacheManager(jobId, localJobConf);
|
|
|
|
+ rjob.distCacheMgr = taskDistributedCacheManager;
|
|
|
|
+ taskDistributedCacheManager.setupCache(TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
+
|
|
|
|
+ // Set some config values
|
|
|
|
+ localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
|
|
|
|
+ getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
|
|
|
|
+ if (conf.get("slave.host.name") != null) {
|
|
|
|
+ localJobConf.set("slave.host.name", conf.get("slave.host.name"));
|
|
|
|
+ }
|
|
|
|
+ resetNumTasksPerJvm(localJobConf);
|
|
|
|
+ localJobConf.setUser(t.getUser());
|
|
|
|
+
|
|
|
|
+ // write back the config (this config will have the updates that the
|
|
|
|
+ // distributed cache manager makes as well)
|
|
|
|
+ JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
|
|
taskController.initializeJob(t.getUser(), jobId.toString(),
|
|
taskController.initializeJob(t.getUser(), jobId.toString(),
|
|
new Path(localJobTokenFile), localJobFile, TaskTracker.this);
|
|
new Path(localJobTokenFile), localJobFile, TaskTracker.this);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- try {
|
|
|
|
- // called holding lock on RunningJob
|
|
|
|
- removeJobFiles(t.getUser(), jobId);
|
|
|
|
- } catch (IOException e2) {
|
|
|
|
- LOG.warn("Failed to add " + jobId + " to cleanup queue", e2);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.warn("Exception while localization " +
|
|
|
|
+ StringUtils.stringifyException(e));
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
return null;
|
|
return null;
|
|
@@ -1727,7 +1722,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
}
|
|
}
|
|
// Delete the job directory for this
|
|
// Delete the job directory for this
|
|
// task if the job is done/failed
|
|
// task if the job is done/failed
|
|
- if (!rjob.keepJobFiles && rjob.localized) {
|
|
|
|
|
|
+ if (!rjob.keepJobFiles) {
|
|
removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
|
|
removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
|
|
}
|
|
}
|
|
// add job to user log manager
|
|
// add job to user log manager
|