|
@@ -338,6 +338,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
} else {
|
|
|
reduceTotal++;
|
|
|
}
|
|
|
+ }
|
|
|
+ synchronized (tip) {
|
|
|
try {
|
|
|
tip.launchTask();
|
|
|
} catch (Throwable ie) {
|
|
@@ -424,7 +426,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
* Some fields in the Task object need to be made machine-specific.
|
|
|
* So here, edit the Task's fields appropriately.
|
|
|
*/
|
|
|
- void localizeTask(Task t) throws IOException {
|
|
|
+ private void localizeTask(Task t) throws IOException {
|
|
|
this.jobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
|
|
|
Path localJobFile =
|
|
|
this.jobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
|
|
@@ -460,7 +462,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
|
|
|
/**
|
|
|
*/
|
|
|
- public TaskStatus createStatus() {
|
|
|
+ public synchronized TaskStatus createStatus() {
|
|
|
TaskStatus status = new TaskStatus(task.getTaskId(), task.isMapTask(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString, "");
|
|
|
if (diagnosticInfo.length() > 0) {
|
|
|
diagnosticInfo = new StringBuffer();
|
|
@@ -520,7 +522,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
/**
|
|
|
* The task has actually finished running.
|
|
|
*/
|
|
|
- public synchronized void taskFinished() {
|
|
|
+ public void taskFinished() {
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
|
|
//
|
|
@@ -538,13 +540,17 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
// Change state to success or failure, depending on whether
|
|
|
// task was 'done' before terminating
|
|
|
//
|
|
|
- if (done) {
|
|
|
- runstate = TaskStatus.SUCCEEDED;
|
|
|
- } else {
|
|
|
- if (!wasKilled) {
|
|
|
- failures += 1;
|
|
|
- }
|
|
|
- runstate = TaskStatus.FAILED;
|
|
|
+ boolean needCleanup = false;
|
|
|
+ synchronized (this) {
|
|
|
+ if (done) {
|
|
|
+ runstate = TaskStatus.SUCCEEDED;
|
|
|
+ } else {
|
|
|
+ if (!wasKilled) {
|
|
|
+ failures += 1;
|
|
|
+ }
|
|
|
+ runstate = TaskStatus.FAILED;
|
|
|
+ }
|
|
|
+ needCleanup = wasKilled || runstate == TaskStatus.FAILED;
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -553,7 +559,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
// if the task succeeded, and its results might be useful
|
|
|
// later on to downstream job processing.
|
|
|
//
|
|
|
- if (wasKilled || runstate == TaskStatus.FAILED) {
|
|
|
+ if (needCleanup) {
|
|
|
try {
|
|
|
cleanup();
|
|
|
} catch (IOException ie) {
|
|
@@ -594,10 +600,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
if (runstate == TaskStatus.SUCCEEDED) {
|
|
|
LOG.info("Reporting output lost:"+task.getTaskId());
|
|
|
runstate = TaskStatus.FAILED; // change status to failure
|
|
|
- synchronized (TaskTracker.this) { // force into next heartbeat
|
|
|
- runningTasks.put(task.getTaskId(), this);
|
|
|
- mapTotal++;
|
|
|
- }
|
|
|
+ runningTasks.put(task.getTaskId(), this);
|
|
|
+ mapTotal++;
|
|
|
} else {
|
|
|
LOG.warning("Output already reported lost:"+task.getTaskId());
|
|
|
}
|
|
@@ -699,8 +703,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
/**
|
|
|
* The task is no longer running. It may not have completed successfully
|
|
|
*/
|
|
|
- synchronized void reportTaskFinished(String taskid) {
|
|
|
- TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
|
|
|
+ void reportTaskFinished(String taskid) {
|
|
|
+ TaskInProgress tip;
|
|
|
+ synchronized (this) {
|
|
|
+ tip = (TaskInProgress) tasks.get(taskid);
|
|
|
+ }
|
|
|
if (tip != null) {
|
|
|
tip.taskFinished();
|
|
|
} else {
|