|
@@ -3262,13 +3262,17 @@ public class JobInProgress {
|
|
|
|
|
|
synchronized void fetchFailureNotification(TaskInProgress tip,
|
|
|
TaskAttemptID mapTaskId,
|
|
|
- String trackerName) {
|
|
|
+ String mapTrackerName,
|
|
|
+ TaskAttemptID reduceTaskId,
|
|
|
+ String reduceTrackerName) {
|
|
|
Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
|
|
|
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
|
|
|
mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
|
|
|
- LOG.info("Failed fetch notification #" + fetchFailures + " for task " +
|
|
|
- mapTaskId);
|
|
|
-
|
|
|
+ LOG.info("Failed fetch notification #" + fetchFailures + " for map task: "
|
|
|
+ + mapTaskId + " running on tracker: " + mapTrackerName
|
|
|
+ + " and reduce task: " + reduceTaskId + " running on tracker: "
|
|
|
+ + reduceTrackerName);
|
|
|
+
|
|
|
float failureRate = (float)fetchFailures / runningReduceTasks;
|
|
|
// declare faulty if fetch-failures >= max-allowed-failures
|
|
|
boolean isMapFaulty = failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT;
|
|
@@ -3280,7 +3284,7 @@ public class JobInProgress {
|
|
|
failedTask(tip, mapTaskId, "Too many fetch-failures",
|
|
|
(tip.isMapTask() ? TaskStatus.Phase.MAP :
|
|
|
TaskStatus.Phase.REDUCE),
|
|
|
- TaskStatus.State.FAILED, trackerName);
|
|
|
+ TaskStatus.State.FAILED, mapTrackerName);
|
|
|
|
|
|
mapTaskIdToFetchFailuresMap.remove(mapTaskId);
|
|
|
}
|