|
@@ -69,6 +69,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
private MapOutputFile mapOutputFile;
|
|
|
|
|
|
private int maxCurrentTasks;
|
|
|
+ private int failures;
|
|
|
|
|
|
class MapOutputServer extends RPC.Server {
|
|
|
private MapOutputServer(int port, int threads) {
|
|
@@ -255,7 +256,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
|
|
|
}
|
|
|
|
|
|
- int resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), justStarted);
|
|
|
+ TaskTrackerStatus status =
|
|
|
+ new TaskTrackerStatus(taskTrackerName, localHostname,
|
|
|
+ mapOutputPort, taskReports, failures);
|
|
|
+ int resultCode = jobClient.emitHeartbeat(status, justStarted);
|
|
|
justStarted = false;
|
|
|
|
|
|
if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {
|
|
@@ -279,10 +283,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
|
|
|
TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
if ((tip.getRunState() == TaskStatus.RUNNING) &&
|
|
|
- (System.currentTimeMillis() - tip.getLastProgressReport() > this.taskTimeout)) {
|
|
|
+ (System.currentTimeMillis() - tip.getLastProgressReport() > this.taskTimeout) &&
|
|
|
+ !tip.wasKilled) {
|
|
|
LOG.info("Task " + tip.getTask().getTaskId() + " timed out. Killing.");
|
|
|
tip.reportDiagnosticInfo("Timed out.");
|
|
|
- tip.killAndCleanup();
|
|
|
+ tip.killAndCleanup(true);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -531,6 +536,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
if (done) {
|
|
|
runstate = TaskStatus.SUCCEEDED;
|
|
|
} else {
|
|
|
+ if (!wasKilled) {
|
|
|
+ failures += 1;
|
|
|
+ }
|
|
|
runstate = TaskStatus.FAILED;
|
|
|
}
|
|
|
|
|
@@ -554,7 +562,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
*/
|
|
|
public synchronized void jobHasFinished() throws IOException {
|
|
|
if (getRunState() == TaskStatus.RUNNING) {
|
|
|
- killAndCleanup();
|
|
|
+ killAndCleanup(false);
|
|
|
} else {
|
|
|
cleanup();
|
|
|
}
|
|
@@ -563,9 +571,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
/**
|
|
|
* This task has run on too long, and should be killed.
|
|
|
*/
|
|
|
- public synchronized void killAndCleanup() throws IOException {
|
|
|
+ public synchronized void killAndCleanup(boolean wasFailure
|
|
|
+ ) throws IOException {
|
|
|
if (runstate == TaskStatus.RUNNING) {
|
|
|
wasKilled = true;
|
|
|
+ if (wasFailure) {
|
|
|
+ failures += 1;
|
|
|
+ }
|
|
|
runner.kill();
|
|
|
}
|
|
|
}
|