|
@@ -2050,10 +2050,16 @@ public class TaskTracker
|
|
|
if (this.done ||
|
|
|
(this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
|
|
|
this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
|
|
|
- !isCleaningup())) {
|
|
|
+ !isCleaningup()) ||
|
|
|
+ ((this.taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
|
|
|
+ this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
|
|
|
+ this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) &&
|
|
|
+ taskStatus.getRunState() == TaskStatus.State.RUNNING)) {
|
|
|
//make sure we ignore progress messages after a task has
|
|
|
//invoked TaskUmbilicalProtocol.done() or if the task has been
|
|
|
- //KILLED/FAILED
|
|
|
+ //KILLED/FAILED/FAILED_UNCLEAN/KILLED_UNCLEAN
|
|
|
+ //Also ignore progress update if the state change is from
|
|
|
+ //COMMIT_PENDING/FAILED_UNCLEAN/KILLED_UNCLEA to RUNNING
|
|
|
LOG.info(task.getTaskID() + " Ignoring status-update since " +
|
|
|
((this.done) ? "task is 'done'" :
|
|
|
("runState: " + this.taskStatus.getRunState()))
|
|
@@ -2407,7 +2413,10 @@ public class TaskTracker
|
|
|
if (wasFailure) {
|
|
|
failures += 1;
|
|
|
}
|
|
|
- runner.kill();
|
|
|
+ // runner could be null if task-cleanup attempt is not localized yet
|
|
|
+ if (runner != null) {
|
|
|
+ runner.kill();
|
|
|
+ }
|
|
|
setTaskFailState(wasFailure);
|
|
|
} else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
|
|
|
if (wasFailure) {
|
|
@@ -2486,6 +2495,11 @@ public class TaskTracker
|
|
|
}
|
|
|
synchronized (this) {
|
|
|
try {
|
|
|
+ // localJobConf could be null if localization has not happened
|
|
|
+ // then no cleanup will be required.
|
|
|
+ if (localJobConf == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
String taskDir = getLocalTaskDir(task.getJobID().toString(),
|
|
|
taskId.toString(), task.isTaskCleanupTask());
|
|
|
if (needCleanup) {
|
|
@@ -2622,7 +2636,8 @@ public class TaskTracker
|
|
|
public synchronized void commitPending(TaskAttemptID taskid,
|
|
|
TaskStatus taskStatus)
|
|
|
throws IOException {
|
|
|
- LOG.info("Task " + taskid + " is in COMMIT_PENDING");
|
|
|
+ LOG.info("Task " + taskid + " is in commit-pending," +"" +
|
|
|
+ " task state:" +taskStatus.getRunState());
|
|
|
statusUpdate(taskid, taskStatus);
|
|
|
reportTaskFinished(taskid, true);
|
|
|
}
|