|
@@ -729,7 +729,7 @@ public class TaskTracker
|
|
|
} catch (Throwable ie) {
|
|
|
tip.taskStatus.setRunState(TaskStatus.State.FAILED);
|
|
|
try {
|
|
|
- tip.cleanup();
|
|
|
+ tip.cleanup(true);
|
|
|
} catch (Throwable ie2) {
|
|
|
// Ignore it, we are just trying to cleanup.
|
|
|
}
|
|
@@ -1417,7 +1417,7 @@ public class TaskTracker
|
|
|
TaskTracker.getJobCacheSubdir()
|
|
|
+ Path.SEPARATOR + task.getJobID()
|
|
|
+ Path.SEPARATOR + task.getTaskID()
|
|
|
- + Path.SEPARATOR + "work",
|
|
|
+ + Path.SEPARATOR + MRConstants.WORKDIR,
|
|
|
defaultJobConf);
|
|
|
if (!localFs.mkdirs(cwd)) {
|
|
|
throw new IOException("Mkdirs failed to create "
|
|
@@ -1652,7 +1652,7 @@ public class TaskTracker
|
|
|
TaskTracker.getJobCacheSubdir()
|
|
|
+ Path.SEPARATOR + task.getJobID()
|
|
|
+ Path.SEPARATOR + task.getTaskID()
|
|
|
- + Path.SEPARATOR + "work",
|
|
|
+ + Path.SEPARATOR + MRConstants.WORKDIR,
|
|
|
localJobConf). toString());
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Working Directory of the task " + task.getTaskID() +
|
|
@@ -1721,12 +1721,13 @@ public class TaskTracker
|
|
|
// later on to downstream job processing.
|
|
|
//
|
|
|
if (needCleanup) {
|
|
|
- try {
|
|
|
- removeTaskFromJob(task.getJobID(), this);
|
|
|
- cleanup();
|
|
|
- } catch (IOException ie) {
|
|
|
- }
|
|
|
+ removeTaskFromJob(task.getJobID(), this);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ cleanup(needCleanup);
|
|
|
+ } catch (IOException ie) {
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
|
|
@@ -1824,7 +1825,7 @@ public class TaskTracker
|
|
|
}
|
|
|
|
|
|
// Cleanup on the finished task
|
|
|
- cleanup();
|
|
|
+ cleanup(true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1878,12 +1879,18 @@ public class TaskTracker
|
|
|
* Any calls to cleanup should not lock the tip first.
|
|
|
* cleanup does the right thing- updates tasks in Tasktracker
|
|
|
* by locking tasktracker first and then locks the tip.
|
|
|
+ *
|
|
|
+ * if needCleanup is true, the whole task directory is cleaned up.
|
|
|
+ * otherwise the current working directory of the task
|
|
|
+ * i.e. <taskid>/work is cleaned up.
|
|
|
*/
|
|
|
- void cleanup() throws IOException {
|
|
|
+ void cleanup(boolean needCleanup) throws IOException {
|
|
|
TaskAttemptID taskId = task.getTaskID();
|
|
|
LOG.debug("Cleaning up " + taskId);
|
|
|
synchronized (TaskTracker.this) {
|
|
|
- tasks.remove(taskId);
|
|
|
+ if (needCleanup) {
|
|
|
+ tasks.remove(taskId);
|
|
|
+ }
|
|
|
synchronized (this){
|
|
|
if (alwaysKeepTaskFiles ||
|
|
|
(taskStatus.getRunState() == TaskStatus.State.FAILED &&
|
|
@@ -1894,13 +1901,17 @@ public class TaskTracker
|
|
|
}
|
|
|
synchronized (this) {
|
|
|
try {
|
|
|
- if (runner != null) {
|
|
|
- runner.close();
|
|
|
+ String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
|
|
|
+ + task.getJobID() + Path.SEPARATOR + taskId;
|
|
|
+ if (needCleanup) {
|
|
|
+ if (runner != null) {
|
|
|
+ runner.close();
|
|
|
+ }
|
|
|
+ defaultJobConf.deleteLocalFiles(taskDir);
|
|
|
+ } else {
|
|
|
+ defaultJobConf.deleteLocalFiles(taskDir + Path.SEPARATOR +
|
|
|
+ MRConstants.WORKDIR);
|
|
|
}
|
|
|
- defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
|
|
|
- JOBCACHE + Path.SEPARATOR +
|
|
|
- task.getJobID() +
|
|
|
- Path.SEPARATOR + taskId);
|
|
|
} catch (Throwable ie) {
|
|
|
LOG.info("Error cleaning up task runner: " +
|
|
|
StringUtils.stringifyException(ie));
|