Parcourir la source

HADOOP-5282. Fixed job history logs for task attempts that are failed by the JobTracker, say due to lost task trackers. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@746274 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala il y a 16 ans
Parent
commit
0d296a882d
2 fichiers modifiés avec 27 ajouts et 13 suppressions
  1. 3 0
      CHANGES.txt
  2. 24 13
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java

+ 3 - 0
CHANGES.txt

@@ -818,6 +818,9 @@ Release 0.20.0 - Unreleased
     job is not in memory and a tasktracker comes to the jobtracker with a status
     report of a task belonging to that job. (Amar Kamat via ddas)
 
+    HADOOP-5282. Fixed job history logs for task attempts that are failed by the
+    JobTracker, say due to lost task trackers. (Amar Kamat via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 24 - 13
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -1992,10 +1992,7 @@ class JobInProgress {
     TaskTrackerStatus ttStatus = 
       this.jobtracker.getTaskTracker(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
-    String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
-                      tip.isJobSetupTask() ? Values.SETUP.name() :
-                      tip.isMapTask() ? Values.MAP.name() : 
-                      Values.REDUCE.name();
+    String taskType = getTaskType(tip);
     if (status.getIsMap()){
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                                        status.getTaskTracker(), 
@@ -2306,10 +2303,7 @@ class JobInProgress {
     List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
     String diagInfo = taskDiagnosticInfo == null ? "" :
       StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
-    String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
-                      tip.isJobSetupTask() ? Values.SETUP.name() :
-                      tip.isMapTask() ? Values.MAP.name() : 
-                      Values.REDUCE.name();
+    String taskType = getTaskType(tip);
     if (taskStatus.getIsMap()) {
       JobHistory.MapAttempt.logStarted(taskid, startTime, 
         taskTrackerName, taskTrackerPort, taskType);
@@ -2432,12 +2426,14 @@ class JobInProgress {
                                                     trackerName, phase,
                                                     new Counters());
     status.setFinishTime(System.currentTimeMillis());
+    boolean wasComplete = tip.isComplete();
     updateTaskStatus(tip, status, metrics);
-    JobHistory.Task.logFailed(tip.getTIPId(), 
-                              tip.isJobCleanupTask() ? Values.CLEANUP.name() : 
-                              tip.isJobSetupTask() ? Values.SETUP.name() : 
-                              tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
-                              tip.getExecFinishTime(), reason, taskid); 
+    boolean isComplete = tip.isComplete();
+    if (wasComplete && !isComplete) { // mark a successful tip as failed
+      String taskType = getTaskType(tip);
+      JobHistory.Task.logFailed(tip.getTIPId(), taskType, 
+                                tip.getExecFinishTime(), reason, taskid);
+    }
   }
        
                            
@@ -2611,4 +2607,19 @@ class JobInProgress {
   boolean isComplete() {
     return status.isJobComplete();
   }
+  
+  /**
+   * Get the task type for logging it to {@link JobHistory}.
+   */
+  private String getTaskType(TaskInProgress tip) {
+    if (tip.isJobCleanupTask()) {
+      return Values.CLEANUP.name();
+    } else if (tip.isJobSetupTask()) {
+      return Values.SETUP.name();
+    } else if (tip.isMapTask()) {
+      return Values.MAP.name();
+    } else {
+      return Values.REDUCE.name();
+    }
+  }
 }