|
@@ -63,7 +63,13 @@ class JobInProgress {
|
|
|
int finishedMapTasks = 0;
|
|
|
int finishedReduceTasks = 0;
|
|
|
int failedMapTasks = 0;
|
|
|
- int failedReduceTasks = 0;
|
|
|
+ int failedReduceTasks = 0;
|
|
|
+
|
|
|
+ int mapFailuresPercent = 0;
|
|
|
+ int reduceFailuresPercent = 0;
|
|
|
+ int failedMapTIPs = 0;
|
|
|
+ int failedReduceTIPs = 0;
|
|
|
+
|
|
|
JobTracker jobtracker = null;
|
|
|
Map<String,List<TaskInProgress>> hostToMaps =
|
|
|
new HashMap<String,List<TaskInProgress>>();
|
|
@@ -91,7 +97,14 @@ class JobInProgress {
|
|
|
|
|
|
private LocalFileSystem localFs;
|
|
|
private String uniqueString;
|
|
|
-
|
|
|
+
|
|
|
+ // Per-job counters
|
|
|
+ public static enum Counter {
|
|
|
+ NUM_FAILED_MAPS,
|
|
|
+ NUM_FAILED_REDUCES
|
|
|
+ }
|
|
|
+ private Counters jobCounters = new Counters();
|
|
|
+
|
|
|
private Counters mapCounters = new Counters();
|
|
|
private Counters reduceCounters = new Counters();
|
|
|
private MetricsRecord jobMetrics;
|
|
@@ -130,6 +143,9 @@ class JobInProgress {
|
|
|
this.numReduceTasks = conf.getNumReduceTasks();
|
|
|
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
|
|
|
numMapTasks + numReduceTasks + 10);
|
|
|
+
|
|
|
+ this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
|
|
|
+ this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
|
|
|
|
|
|
JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(),
|
|
|
System.currentTimeMillis(), jobFile);
|
|
@@ -359,14 +375,6 @@ class JobInProgress {
|
|
|
tip.setSuccessEventNumber(taskCompletionEventTracker);
|
|
|
} else if (state == TaskStatus.State.FAILED ||
|
|
|
state == TaskStatus.State.KILLED) {
|
|
|
- taskEvent = new TaskCompletionEvent(
|
|
|
- taskCompletionEventTracker,
|
|
|
- status.getTaskId(),
|
|
|
- tip.idWithinJob(),
|
|
|
- status.getIsMap(),
|
|
|
- TaskCompletionEvent.Status.FAILED,
|
|
|
- httpTaskLogLocation
|
|
|
- );
|
|
|
// Get the event number for the (possibly) previously successful
|
|
|
// task. If there exists one, then set that status to OBSOLETE
|
|
|
int eventNumber;
|
|
@@ -376,9 +384,24 @@ class JobInProgress {
|
|
|
if (t.getTaskId().equals(status.getTaskId()))
|
|
|
t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
|
|
|
}
|
|
|
+
|
|
|
// Tell the job to fail the relevant task
|
|
|
failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
|
|
|
- wasRunning, wasComplete);
|
|
|
+ wasRunning, wasComplete, metrics);
|
|
|
+
|
|
|
+ // Did the task failure lead to tip failure?
|
|
|
+ TaskCompletionEvent.Status taskCompletionStatus =
|
|
|
+ TaskCompletionEvent.Status.FAILED;
|
|
|
+ if (tip.isFailed()) {
|
|
|
+ taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
|
|
|
+ }
|
|
|
+ taskEvent = new TaskCompletionEvent(taskCompletionEventTracker,
|
|
|
+ status.getTaskId(),
|
|
|
+ tip.idWithinJob(),
|
|
|
+ status.getIsMap(),
|
|
|
+ taskCompletionStatus,
|
|
|
+ httpTaskLogLocation
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
// Add the 'complete' task i.e. successful/failed
|
|
@@ -411,7 +434,16 @@ class JobInProgress {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the job-level counters.
|
|
|
+ *
|
|
|
+ * @return the job-level counters.
|
|
|
+ */
|
|
|
+ public synchronized Counters getJobCounters() {
|
|
|
+ return jobCounters;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Returns map phase counters by summing over all map tasks in progress.
|
|
|
*/
|
|
@@ -427,11 +459,12 @@ class JobInProgress {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the total job counters, by adding together the map and the
|
|
|
- * reduce counters.
|
|
|
+ * Returns the total job counters, by adding together the job,
|
|
|
+ * the map and the reduce counters.
|
|
|
*/
|
|
|
public Counters getCounters() {
|
|
|
- return Counters.sum(getMapCounters(), getReduceCounters());
|
|
|
+ return Counters.sum(getJobCounters(),
|
|
|
+ Counters.sum(getMapCounters(), getReduceCounters()));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -741,9 +774,27 @@ class JobInProgress {
|
|
|
//
|
|
|
// Figure out whether the Job is done
|
|
|
//
|
|
|
+ isJobComplete(tip, metrics);
|
|
|
+
|
|
|
+ if (this.status.getRunState() != JobStatus.RUNNING) {
|
|
|
+ // The job has been killed/failed,
|
|
|
+ // JobTracker should cleanup this task
|
|
|
+ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if the job is done since all it's component tasks are either
|
|
|
+ * successful or have failed.
|
|
|
+ *
|
|
|
+ * @param tip the current tip which completed either succesfully or failed
|
|
|
+ * @param metrics job-tracker metrics
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
|
|
|
boolean allDone = true;
|
|
|
for (int i = 0; i < maps.length; i++) {
|
|
|
- if (!maps[i].isComplete()) {
|
|
|
+ if (!(maps[i].isComplete() || maps[i].isFailed())) {
|
|
|
allDone = false;
|
|
|
break;
|
|
|
}
|
|
@@ -753,7 +804,7 @@ class JobInProgress {
|
|
|
this.status.setMapProgress(1.0f);
|
|
|
}
|
|
|
for (int i = 0; i < reduces.length; i++) {
|
|
|
- if (!reduces[i].isComplete()) {
|
|
|
+ if (!(reduces[i].isComplete() || reduces[i].isFailed())) {
|
|
|
allDone = false;
|
|
|
break;
|
|
|
}
|
|
@@ -773,13 +824,11 @@ class JobInProgress {
|
|
|
JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime,
|
|
|
this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);
|
|
|
metrics.completeJob();
|
|
|
- } else if (this.status.getRunState() != JobStatus.RUNNING) {
|
|
|
- // The job has been killed/failed,
|
|
|
- // JobTracker should cleanup this task
|
|
|
- jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
|
|
|
+ return true;
|
|
|
}
|
|
|
+
|
|
|
+ return false;
|
|
|
}
|
|
|
-
|
|
|
/**
|
|
|
* Kill the job and all its component tasks.
|
|
|
*/
|
|
@@ -809,7 +858,7 @@ class JobInProgress {
|
|
|
* A task assigned to this JobInProgress has reported in as failed.
|
|
|
* Most of the time, we'll just reschedule execution. However, after
|
|
|
* many repeated failures we may instead decide to allow the entire
|
|
|
- * job to fail.
|
|
|
+ * job to fail or succeed if the user doesn't care about a few tasks failing.
|
|
|
*
|
|
|
* Even if a task has reported as completed in the past, it might later
|
|
|
* be reported as failed. That's because the TaskTracker that hosts a map
|
|
@@ -819,7 +868,8 @@ class JobInProgress {
|
|
|
*/
|
|
|
private void failedTask(TaskInProgress tip, String taskid,
|
|
|
TaskStatus status, String trackerName,
|
|
|
- boolean wasRunning, boolean wasComplete) {
|
|
|
+ boolean wasRunning, boolean wasComplete,
|
|
|
+ JobTrackerMetrics metrics) {
|
|
|
// Mark the taskid as a 'failure'
|
|
|
tip.incompleteSubTask(taskid, trackerName);
|
|
|
|
|
@@ -881,16 +931,45 @@ class JobInProgress {
|
|
|
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
|
|
|
|
|
|
//
|
|
|
- // Check if we need to kill the job because of too many failures
|
|
|
+ // Check if we need to kill the job because of too many failures or
|
|
|
+ // if the job is complete since all component tasks have completed
|
|
|
//
|
|
|
if (tip.isFailed()) {
|
|
|
- LOG.info("Aborting job " + profile.getJobId());
|
|
|
- JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(),
|
|
|
- tip.isMapTask() ? Values.MAP.name():Values.REDUCE.name(),
|
|
|
- System.currentTimeMillis(), status.getDiagnosticInfo());
|
|
|
- JobHistory.JobInfo.logFailed(profile.getJobId(),
|
|
|
- System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);
|
|
|
- kill();
|
|
|
+ //
|
|
|
+ // Allow upto 'mapFailuresPercent' of map tasks to fail or
|
|
|
+ // 'reduceFailuresPercent' of reduce tasks to fail
|
|
|
+ //
|
|
|
+ boolean killJob =
|
|
|
+ tip.isMapTask() ?
|
|
|
+ (((++failedMapTIPs*100)/numMapTasks) > mapFailuresPercent) :
|
|
|
+ (((++failedReduceTIPs*100)/numReduceTasks) > reduceFailuresPercent);
|
|
|
+
|
|
|
+ if (killJob) {
|
|
|
+ LOG.info("Aborting job " + profile.getJobId());
|
|
|
+ JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(),
|
|
|
+ tip.isMapTask() ?
|
|
|
+ Values.MAP.name() :
|
|
|
+ Values.REDUCE.name(),
|
|
|
+ System.currentTimeMillis(),
|
|
|
+ status.getDiagnosticInfo());
|
|
|
+ JobHistory.JobInfo.logFailed(profile.getJobId(),
|
|
|
+ System.currentTimeMillis(),
|
|
|
+ this.finishedMapTasks,
|
|
|
+ this.finishedReduceTasks
|
|
|
+ );
|
|
|
+ kill();
|
|
|
+ } else {
|
|
|
+ isJobComplete(tip, metrics);
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Update the counters
|
|
|
+ //
|
|
|
+ if (tip.isMapTask()) {
|
|
|
+ jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
|
|
|
+ } else {
|
|
|
+ jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|