|
@@ -115,6 +115,9 @@ public class JobInProgress {
|
|
|
|
|
|
// runningMapTasks include speculative tasks, so we need to capture
|
|
// runningMapTasks include speculative tasks, so we need to capture
|
|
// speculative tasks separately
|
|
// speculative tasks separately
|
|
|
|
+ // if a task is incomplete, running attempts over one per task are counted
|
|
|
|
+ // in these variables. if a task is complete, all its running attempts are
|
|
|
|
+ // included
|
|
int speculativeMapTasks = 0;
|
|
int speculativeMapTasks = 0;
|
|
int speculativeReduceTasks = 0;
|
|
int speculativeReduceTasks = 0;
|
|
|
|
|
|
@@ -1738,6 +1741,7 @@ public class JobInProgress {
|
|
String name;
|
|
String name;
|
|
String splits = "";
|
|
String splits = "";
|
|
Enum<Counter> counter = null;
|
|
Enum<Counter> counter = null;
|
|
|
|
+ boolean speculative = tip.getActiveTasks().size() > 1;
|
|
if (tip.isJobSetupTask()) {
|
|
if (tip.isJobSetupTask()) {
|
|
launchedSetup = true;
|
|
launchedSetup = true;
|
|
name = Values.SETUP.name();
|
|
name = Values.SETUP.name();
|
|
@@ -1749,18 +1753,18 @@ public class JobInProgress {
|
|
name = Values.MAP.name();
|
|
name = Values.MAP.name();
|
|
counter = Counter.TOTAL_LAUNCHED_MAPS;
|
|
counter = Counter.TOTAL_LAUNCHED_MAPS;
|
|
splits = tip.getSplitNodes();
|
|
splits = tip.getSplitNodes();
|
|
- if (tip.getActiveTasks().size() > 1)
|
|
|
|
|
|
+ if (speculative)
|
|
speculativeMapTasks++;
|
|
speculativeMapTasks++;
|
|
- metrics.launchMap(id);
|
|
|
|
- this.queueMetrics.launchMap(id);
|
|
|
|
|
|
+ metrics.launchMap(id, speculative);
|
|
|
|
+ this.queueMetrics.launchMap(id, speculative);
|
|
} else {
|
|
} else {
|
|
++runningReduceTasks;
|
|
++runningReduceTasks;
|
|
name = Values.REDUCE.name();
|
|
name = Values.REDUCE.name();
|
|
counter = Counter.TOTAL_LAUNCHED_REDUCES;
|
|
counter = Counter.TOTAL_LAUNCHED_REDUCES;
|
|
- if (tip.getActiveTasks().size() > 1)
|
|
|
|
|
|
+ if (speculative)
|
|
speculativeReduceTasks++;
|
|
speculativeReduceTasks++;
|
|
- metrics.launchReduce(id);
|
|
|
|
- this.queueMetrics.launchReduce(id);
|
|
|
|
|
|
+ metrics.launchReduce(id, speculative);
|
|
|
|
+ this.queueMetrics.launchReduce(id, speculative);
|
|
}
|
|
}
|
|
// Note that the logs are for the scheduled tasks only. Tasks that join on
|
|
// Note that the logs are for the scheduled tasks only. Tasks that join on
|
|
// restart has already their logs in place.
|
|
// restart has already their logs in place.
|
|
@@ -2692,10 +2696,6 @@ public class JobInProgress {
|
|
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
|
|
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
|
|
} else if (tip.isMapTask()) {
|
|
} else if (tip.isMapTask()) {
|
|
runningMapTasks -= 1;
|
|
runningMapTasks -= 1;
|
|
- // check if this was a sepculative task
|
|
|
|
- if (oldNumAttempts > 1) {
|
|
|
|
- speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
|
|
|
|
- }
|
|
|
|
finishedMapTasks += 1;
|
|
finishedMapTasks += 1;
|
|
metrics.completeMap(taskid);
|
|
metrics.completeMap(taskid);
|
|
this.queueMetrics.completeMap(taskid);
|
|
this.queueMetrics.completeMap(taskid);
|
|
@@ -2709,9 +2709,6 @@ public class JobInProgress {
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
runningReduceTasks -= 1;
|
|
runningReduceTasks -= 1;
|
|
- if (oldNumAttempts > 1) {
|
|
|
|
- speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
|
|
|
|
- }
|
|
|
|
finishedReduceTasks += 1;
|
|
finishedReduceTasks += 1;
|
|
metrics.completeReduce(taskid);
|
|
metrics.completeReduce(taskid);
|
|
this.queueMetrics.completeReduce(taskid);
|
|
this.queueMetrics.completeReduce(taskid);
|
|
@@ -3003,7 +3000,7 @@ public class JobInProgress {
|
|
tip.incompleteSubTask(taskid, this.status);
|
|
tip.incompleteSubTask(taskid, this.status);
|
|
|
|
|
|
boolean isRunning = tip.isRunning();
|
|
boolean isRunning = tip.isRunning();
|
|
- boolean isComplete = tip.isComplete();
|
|
|
|
|
|
+ boolean tipIsComplete = tip.isComplete();
|
|
boolean metricsDone = isComplete(); // job metrics garbage collected
|
|
boolean metricsDone = isComplete(); // job metrics garbage collected
|
|
|
|
|
|
if (wasAttemptRunning) {
|
|
if (wasAttemptRunning) {
|
|
@@ -3018,14 +3015,24 @@ public class JobInProgress {
|
|
// hence we are decrementing the same set.
|
|
// hence we are decrementing the same set.
|
|
// Except after garbageCollect in a different thread.
|
|
// Except after garbageCollect in a different thread.
|
|
if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
|
|
if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
|
|
|
|
+ boolean incWaiting = !tipIsComplete && !metricsDone &&
|
|
|
|
+ tip.getActiveTasks().isEmpty();
|
|
|
|
+ boolean wasSpeculative = wasComplete || !tip.getActiveTasks().isEmpty();
|
|
|
|
+
|
|
if (tip.isMapTask() && !metricsDone) {
|
|
if (tip.isMapTask() && !metricsDone) {
|
|
runningMapTasks -= 1;
|
|
runningMapTasks -= 1;
|
|
- metrics.failedMap(taskid);
|
|
|
|
- this.queueMetrics.failedMap(taskid);
|
|
|
|
|
|
+ metrics.failedMap(taskid, incWaiting);
|
|
|
|
+ this.queueMetrics.failedMap(taskid, incWaiting);
|
|
|
|
+ if (wasSpeculative) {
|
|
|
|
+ speculativeMapTasks--;
|
|
|
|
+ }
|
|
} else if (!metricsDone) {
|
|
} else if (!metricsDone) {
|
|
runningReduceTasks -= 1;
|
|
runningReduceTasks -= 1;
|
|
- metrics.failedReduce(taskid);
|
|
|
|
- this.queueMetrics.failedReduce(taskid);
|
|
|
|
|
|
+ metrics.failedReduce(taskid, incWaiting);
|
|
|
|
+ this.queueMetrics.failedReduce(taskid, incWaiting);
|
|
|
|
+ if (wasSpeculative) {
|
|
|
|
+ speculativeReduceTasks--;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -3042,14 +3049,14 @@ public class JobInProgress {
|
|
} else if (tip.isMapTask()) {
|
|
} else if (tip.isMapTask()) {
|
|
// remove from the running queue and put it in the non-running cache
|
|
// remove from the running queue and put it in the non-running cache
|
|
// if the tip is not complete i.e if the tip still needs to be run
|
|
// if the tip is not complete i.e if the tip still needs to be run
|
|
- if (!isComplete) {
|
|
|
|
|
|
+ if (!tipIsComplete) {
|
|
retireMap(tip);
|
|
retireMap(tip);
|
|
failMap(tip);
|
|
failMap(tip);
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
// remove from the running queue and put in the failed queue if the tip
|
|
// remove from the running queue and put in the failed queue if the tip
|
|
// is not complete
|
|
// is not complete
|
|
- if (!isComplete) {
|
|
|
|
|
|
+ if (!tipIsComplete) {
|
|
retireReduce(tip);
|
|
retireReduce(tip);
|
|
failReduce(tip);
|
|
failReduce(tip);
|
|
}
|
|
}
|
|
@@ -3058,7 +3065,7 @@ public class JobInProgress {
|
|
|
|
|
|
// The case when the map was complete but the task tracker went down.
|
|
// The case when the map was complete but the task tracker went down.
|
|
// However, we don't need to do any metering here...
|
|
// However, we don't need to do any metering here...
|
|
- if (wasComplete && !isComplete) {
|
|
|
|
|
|
+ if (wasComplete && !tipIsComplete) {
|
|
if (tip.isMapTask()) {
|
|
if (tip.isMapTask()) {
|
|
// Put the task back in the cache. This will help locality for cases
|
|
// Put the task back in the cache. This will help locality for cases
|
|
// where we have a different TaskTracker from the same rack/switch
|
|
// where we have a different TaskTracker from the same rack/switch
|