|
@@ -284,7 +284,7 @@ public class TaskTracker
|
|
|
tip.setJobConf(jobConf);
|
|
|
tip.launchTask();
|
|
|
} catch (Throwable ie) {
|
|
|
- tip.runstate = TaskStatus.FAILED;
|
|
|
+ tip.runstate = TaskStatus.State.FAILED;
|
|
|
try {
|
|
|
tip.cleanup();
|
|
|
} catch (Throwable ie2) {
|
|
@@ -460,7 +460,7 @@ public class TaskTracker
|
|
|
for (Iterator it = taskReports.iterator();
|
|
|
it.hasNext(); ) {
|
|
|
TaskStatus taskStatus = (TaskStatus) it.next();
|
|
|
- if (taskStatus.getRunState() != TaskStatus.RUNNING) {
|
|
|
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
if (taskStatus.getIsMap()) {
|
|
|
mapTotal--;
|
|
|
} else {
|
|
@@ -509,7 +509,7 @@ public class TaskTracker
|
|
|
TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
long timeSinceLastReport = System.currentTimeMillis() -
|
|
|
tip.getLastProgressReport();
|
|
|
- if ((tip.getRunState() == TaskStatus.RUNNING) &&
|
|
|
+ if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
|
|
|
(timeSinceLastReport > this.taskTimeout) &&
|
|
|
!tip.wasKilled) {
|
|
|
String msg = "Task failed to report status for " +
|
|
@@ -577,7 +577,7 @@ public class TaskTracker
|
|
|
|
|
|
for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
|
|
|
TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
- if ((tip.getRunState() == TaskStatus.RUNNING) &&
|
|
|
+ if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
|
|
|
!tip.wasKilled) {
|
|
|
|
|
|
if (killMe == null) {
|
|
@@ -770,7 +770,7 @@ public class TaskTracker
|
|
|
class TaskInProgress {
|
|
|
Task task;
|
|
|
float progress;
|
|
|
- int runstate;
|
|
|
+ TaskStatus.State runstate;
|
|
|
long lastProgressReport;
|
|
|
StringBuffer diagnosticInfo = new StringBuffer();
|
|
|
TaskRunner runner;
|
|
@@ -788,7 +788,7 @@ public class TaskTracker
|
|
|
public TaskInProgress(Task task, JobConf conf) {
|
|
|
this.task = task;
|
|
|
this.progress = 0.0f;
|
|
|
- this.runstate = TaskStatus.UNASSIGNED;
|
|
|
+ this.runstate = TaskStatus.State.UNASSIGNED;
|
|
|
this.lastProgressReport = System.currentTimeMillis();
|
|
|
this.defaultJobConf = conf;
|
|
|
localJobConf = null;
|
|
@@ -862,7 +862,7 @@ public class TaskTracker
|
|
|
*/
|
|
|
public synchronized void launchTask() throws IOException {
|
|
|
localizeTask(task);
|
|
|
- this.runstate = TaskStatus.RUNNING;
|
|
|
+ this.runstate = TaskStatus.State.RUNNING;
|
|
|
this.runner = task.createRunner(TaskTracker.this);
|
|
|
this.runner.start();
|
|
|
this.taskStatus.setStartTime(System.currentTimeMillis());
|
|
@@ -874,7 +874,7 @@ public class TaskTracker
|
|
|
public synchronized void reportProgress(float p, String state, Phase newPhase) {
|
|
|
LOG.info(task.getTaskId()+" "+p+"% "+state);
|
|
|
this.progress = p;
|
|
|
- this.runstate = TaskStatus.RUNNING;
|
|
|
+ this.runstate = TaskStatus.State.RUNNING;
|
|
|
this.lastProgressReport = System.currentTimeMillis();
|
|
|
Phase oldPhase = taskStatus.getPhase() ;
|
|
|
if( oldPhase != newPhase ){
|
|
@@ -896,7 +896,7 @@ public class TaskTracker
|
|
|
|
|
|
/**
|
|
|
*/
|
|
|
- public int getRunState() {
|
|
|
+ public TaskStatus.State getRunState() {
|
|
|
return runstate;
|
|
|
}
|
|
|
|
|
@@ -941,16 +941,19 @@ public class TaskTracker
|
|
|
boolean needCleanup = false;
|
|
|
synchronized (this) {
|
|
|
if (done) {
|
|
|
- runstate = TaskStatus.SUCCEEDED;
|
|
|
+ runstate = TaskStatus.State.SUCCEEDED;
|
|
|
} else {
|
|
|
if (!wasKilled) {
|
|
|
failures += 1;
|
|
|
+ runstate = TaskStatus.State.FAILED;
|
|
|
+ } else {
|
|
|
+ runstate = TaskStatus.State.KILLED;
|
|
|
}
|
|
|
- runstate = TaskStatus.FAILED;
|
|
|
progress = 0.0f;
|
|
|
}
|
|
|
this.taskStatus.setFinishTime(System.currentTimeMillis());
|
|
|
- needCleanup = runstate == TaskStatus.FAILED;
|
|
|
+ needCleanup = (runstate == TaskStatus.State.FAILED) |
|
|
|
+ (runstate == TaskStatus.State.KILLED);
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -973,7 +976,7 @@ public class TaskTracker
|
|
|
*/
|
|
|
public synchronized void jobHasFinished() throws IOException {
|
|
|
|
|
|
- if (getRunState() == TaskStatus.RUNNING) {
|
|
|
+ if (getRunState() == TaskStatus.State.RUNNING) {
|
|
|
killAndCleanup(false);
|
|
|
} else {
|
|
|
cleanup();
|
|
@@ -991,7 +994,7 @@ public class TaskTracker
|
|
|
*/
|
|
|
public synchronized void killAndCleanup(boolean wasFailure
|
|
|
) throws IOException {
|
|
|
- if (runstate == TaskStatus.RUNNING) {
|
|
|
+ if (runstate == TaskStatus.State.RUNNING) {
|
|
|
wasKilled = true;
|
|
|
if (wasFailure) {
|
|
|
failures += 1;
|
|
@@ -1005,9 +1008,9 @@ public class TaskTracker
|
|
|
*/
|
|
|
public synchronized void mapOutputLost(String failure
|
|
|
) throws IOException {
|
|
|
- if (runstate == TaskStatus.SUCCEEDED) {
|
|
|
+ if (runstate == TaskStatus.State.SUCCEEDED) {
|
|
|
LOG.info("Reporting output lost:"+task.getTaskId());
|
|
|
- runstate = TaskStatus.FAILED; // change status to failure
|
|
|
+ runstate = TaskStatus.State.FAILED; // change status to failure
|
|
|
progress = 0.0f;
|
|
|
reportDiagnosticInfo("Map output lost, rescheduling: " +
|
|
|
failure);
|
|
@@ -1029,7 +1032,7 @@ public class TaskTracker
|
|
|
synchronized (TaskTracker.this) {
|
|
|
tasks.remove(taskId);
|
|
|
if (alwaysKeepTaskFiles ||
|
|
|
- (runstate == TaskStatus.FAILED &&
|
|
|
+ (runstate == TaskStatus.State.FAILED &&
|
|
|
keepFailedTaskFiles)) {
|
|
|
return;
|
|
|
}
|