Explorar o código

HADOOP-4112. Handles cleanupTask in JobHistory. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@694694 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das %!s(int64=16) %!d(string=hai) anos
pai
achega
207dc3e823

+ 3 - 0
CHANGES.txt

@@ -534,6 +534,9 @@ Trunk (unreleased changes)
     HADOOP-4094. Hive now has hive-default.xml and hive-site.xml similar
     to core hadoop. (Prasad Chakka via dhruba)
 
+    HADOOP-4112. Handles cleanupTask in JobHistory 
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.18.1 - Unreleased
 
   IMPROVEMENTS

+ 37 - 2
src/mapred/org/apache/hadoop/mapred/HistoryViewer.java

@@ -97,11 +97,15 @@ class HistoryViewer {
     printTasks("MAP", "KILLED");
     printTasks("REDUCE", "FAILED");
     printTasks("REDUCE", "KILLED");
+    printTasks("CLEANUP", "FAILED");
+    printTasks("CLEANUP", "KILLED");
     if (printAll) {
       printTasks("MAP", "SUCCESS");
       printTasks("REDUCE", "SUCCESS");
+      printTasks("CLEANUP", "SUCCESS");
       printAllTaskAttempts("MAP");
       printAllTaskAttempts("REDUCE");
+      printAllTaskAttempts("CLEANUP");
     }
     NodesFilter filter = new FailedOnNodesFilter();
     printFailedAttempts(filter);
@@ -212,14 +216,20 @@ class HistoryViewer {
     Map<String, JobHistory.Task> tasks = job.getAllTasks();
     int totalMaps = 0; 
     int totalReduces = 0; 
+    int totalCleanups = 0;
     int numFailedMaps = 0; 
     int numKilledMaps = 0;
     int numFailedReduces = 0; 
     int numKilledReduces = 0;
+    int numFinishedCleanups = 0;
+    int numFailedCleanups = 0;
+    int numKilledCleanups = 0;
     long mapStarted = 0; 
     long mapFinished = 0; 
     long reduceStarted = 0; 
     long reduceFinished = 0; 
+    long cleanupStarted = 0;
+    long cleanupFinished = 0;
 
     Map <String, String> allHosts = new TreeMap<String, String>();
 
@@ -243,7 +253,7 @@ class HistoryViewer {
                                             attempt.get(Keys.TASK_STATUS))) {
             numKilledMaps++;
           }
-        } else {
+        } else if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) {
           if (reduceStarted==0||reduceStarted > startTime) {
             reduceStarted = startTime; 
           }
@@ -257,6 +267,23 @@ class HistoryViewer {
                                             attempt.get(Keys.TASK_STATUS))) {
             numKilledReduces++;
           }
+        } else if (Values.CLEANUP.name().equals(task.get(Keys.TASK_TYPE))){
+          if (cleanupStarted==0||cleanupStarted > startTime) {
+            cleanupStarted = startTime; 
+          }
+          if (cleanupFinished < finishTime) {
+            cleanupFinished = finishTime; 
+          }
+          totalCleanups++; 
+          if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFinishedCleanups++;
+          } else if (Values.FAILED.name().equals(
+                                            attempt.get(Keys.TASK_STATUS))) {
+            numFailedCleanups++;
+          } else if (Values.KILLED.name().equals(
+                                            attempt.get(Keys.TASK_STATUS))) {
+            numKilledCleanups++;
+          }
         }
       }
     }
@@ -283,6 +310,14 @@ class HistoryViewer {
                                dateFormat, reduceStarted, 0));
     taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
                                dateFormat, reduceFinished, reduceStarted)); 
+    taskSummary.append("\nCleanup\t").append(totalCleanups);
+    taskSummary.append("\t").append(numFinishedCleanups);
+    taskSummary.append("\t\t").append(numFailedCleanups);
+    taskSummary.append("\t").append(numKilledCleanups);
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, cleanupStarted, 0));
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, cleanupFinished, cleanupStarted)); 
     taskSummary.append("\n============================\n");
     System.out.println(taskSummary.toString());
   }
@@ -335,7 +370,7 @@ class HistoryViewer {
           if (Values.MAP.name().equals(task.get(Keys.TASK_TYPE))) {
             mapTasks[mapIndex++] = attempt; 
             avgMapTime += avgFinishTime;
-          } else { 
+          } else if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) { 
             reduceTasks[reduceIndex++] = attempt;
             avgShuffleTime += (attempt.getLong(Keys.SHUFFLE_FINISHED) - 
                                attempt.getLong(Keys.START_TIME));

+ 222 - 39
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -104,7 +104,7 @@ public class JobHistory {
    * most places in history file. 
    */
   public static enum Values {
-    SUCCESS, FAILED, KILLED, MAP, REDUCE
+    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP
   }
 
   // temp buffer for parsed dataa
@@ -763,41 +763,82 @@ public class JobHistory {
      * @param taskAttemptId task attempt id
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param hostName host name of the task attempt. 
+     * @deprecated Use 
+     *             {@link #logStarted(TaskAttemptID, long, String, boolean)}
      */
+    @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
+      logStarted(taskAttemptId, startTime, hostName, false);
+    }
+    
+    /**
+     * Log start time of this map task attempt.
+     *  
+     * @param taskAttemptId task attempt id
+     * @param startTime start time of task attempt as reported by task tracker. 
+     * @param hostName host name of the task attempt.
+     * @param isCleanup Whether the attempt is cleanup or not 
+     */
+    public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
+                                  String hostName, boolean isCleanup){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskAttemptId.getJobID()); 
+                                                   + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
                                      Keys.HOSTNAME},
-                         new String[]{Values.MAP.name(),  taskAttemptId.getTaskID().toString(), 
-                                      taskAttemptId.toString(), String.valueOf(startTime), hostName}); 
+                         new String[]{isCleanup ? Values.CLEANUP.name() : 
+                                                  Values.MAP.name(),
+                                      taskAttemptId.getTaskID().toString(), 
+                                      taskAttemptId.toString(), 
+                                      String.valueOf(startTime), hostName}); 
         }
       }
     }
+    
     /**
      * Log finish time of map task attempt. 
      * @param taskAttemptId task attempt id 
      * @param finishTime finish time
      * @param hostName host name 
+     * @deprecated Use 
+     * {@link #logFinished(TaskAttemptID, long, String, boolean)}
      */
+    @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
                                    String hostName){
+      logFinished(taskAttemptId, finishTime, hostName, false);
+    }
+
+    /**
+     * Log finish time of map task attempt. 
+     * 
+     * @param taskAttemptId task attempt id 
+     * @param finishTime finish time
+     * @param hostName host name 
+     * @param isCleanup Whether the attempt is cleanup or not 
+     */
+    public static void logFinished(TaskAttemptID taskAttemptId, 
+                                   long finishTime, 
+                                   String hostName,
+                                   boolean isCleanup) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskAttemptId.getJobID()); 
+                                                   + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.FINISH_TIME, Keys.HOSTNAME},
-                         new String[]{Values.MAP.name(), taskAttemptId.getTaskID().toString(),
-                                      taskAttemptId.toString(), Values.SUCCESS.name(),  
+                         new String[]{isCleanup ? Values.CLEANUP.name() : 
+                                                  Values.MAP.name(), 
+                                      taskAttemptId.getTaskID().toString(),
+                                      taskAttemptId.toString(), 
+                                      Values.SUCCESS.name(),  
                                       String.valueOf(finishTime), hostName}); 
         }
       }
@@ -808,44 +849,93 @@ public class JobHistory {
      * @param taskAttemptId task attempt id
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
+     * @param error error message if any for this task attempt.
+     * @deprecated Use
+     * {@link #logFailed(TaskAttemptID, long, String, String, boolean)} 
+     */
+    @Deprecated
+    public static void logFailed(TaskAttemptID taskAttemptId, 
+                                 long timestamp, String hostName, 
+                                 String error) {
+      logFailed(taskAttemptId, timestamp, hostName, error, false);
+    }
+
+    /**
+     * Log task attempt failed event. 
+     *  
+     * @param taskAttemptId task attempt id
+     * @param timestamp timestamp
+     * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
+     * @param isCleanup Whether the attempt is cleanup or not 
      */
     public static void logFailed(TaskAttemptID taskAttemptId, 
-                                 long timestamp, String hostName, String error){
+                                 long timestamp, String hostName, 
+                                 String error, boolean isCleanup) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskAttemptId.getJobID()); 
+                                                   + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                         new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 
+                                    Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
-                         new String[]{ Values.MAP.name(), taskAttemptId.getTaskID().toString(),
-                                       taskAttemptId.toString(), Values.FAILED.name(),
-                                       String.valueOf(timestamp), hostName, error}); 
+                         new String[]{ isCleanup ? Values.CLEANUP.name() :
+                                                   Values.MAP.name(), 
+                                       taskAttemptId.getTaskID().toString(),
+                                       taskAttemptId.toString(), 
+                                       Values.FAILED.name(),
+                                       String.valueOf(timestamp), 
+                                       hostName, error}); 
         }
       }
     }
+    
     /**
      * Log task attempt killed event.  
      * @param taskAttemptId task attempt id
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
+     * @deprecated Use 
+     * {@link #logKilled(TaskAttemptID, long, String, String, boolean)}
      */
+    @Deprecated
     public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, String error){
+      logKilled(taskAttemptId, timestamp, hostName, error, false);
+    } 
+    
+    /**
+     * Log task attempt killed event.  
+     * 
+     * @param taskAttemptId task attempt id
+     * @param timestamp timestamp
+     * @param hostName hostname of this task attempt.
+     * @param error error message if any for this task attempt. 
+     * @param isCleanup Whether the attempt is cleanup or not 
+     */
+    public static void logKilled(TaskAttemptID taskAttemptId, 
+                                 long timestamp, String hostName,
+                                 String error, boolean isCleanup){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskAttemptId.getJobID()); 
+                                                   + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
-                                    Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
-                         new String[]{ Values.MAP.name(), taskAttemptId.getTaskID().toString(), 
-                                       taskAttemptId.toString(), Values.KILLED.name(),
-                                       String.valueOf(timestamp), hostName, error}); 
+                         new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
+                                    Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                    Keys.FINISH_TIME, Keys.HOSTNAME,
+                                    Keys.ERROR},
+                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
+                                                   Values.MAP.name(), 
+                                       taskAttemptId.getTaskID().toString(), 
+                                       taskAttemptId.toString(),
+                                       Values.KILLED.name(),
+                                       String.valueOf(timestamp), 
+                                       hostName, error}); 
         }
       }
     } 
@@ -860,22 +950,44 @@ public class JobHistory {
      * @param taskAttemptId task attempt id
      * @param startTime start time
      * @param hostName host name 
+     * @deprecated Use 
+     * {@link #logStarted(TaskAttemptID, long, String, boolean)}
      */
+    @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String hostName){
+      logStarted(taskAttemptId, startTime, hostName, false);
+    }
+    
+    /**
+     * Log start time of  Reduce task attempt. 
+     * 
+     * @param taskAttemptId task attempt id
+     * @param startTime start time
+     * @param hostName host name 
+     * @param isCleanup Whether the attempt is cleanup or not 
+     */
+    public static void logStarted(TaskAttemptID taskAttemptId, 
+                                  long startTime, String hostName, 
+                                  boolean isCleanup) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskAttemptId.getJobID()); 
+                                                   + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
-                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
-                         new String[]{Values.REDUCE.name(),  taskAttemptId.getTaskID().toString(), 
-                                      taskAttemptId.toString(), String.valueOf(startTime), hostName}); 
+                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
+                                      Keys.HOSTNAME},
+                         new String[]{isCleanup ? Values.CLEANUP.name() : 
+                                                  Values.REDUCE.name(),
+                                      taskAttemptId.getTaskID().toString(), 
+                                      taskAttemptId.toString(), 
+                                      String.valueOf(startTime), hostName}); 
         }
       }
     }
+    
     /**
      * Log finished event of this task. 
      * @param taskAttemptId task attempt id
@@ -883,13 +995,34 @@ public class JobHistory {
      * @param sortFinished sort finish time
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
+     * @deprecated Use 
+     * {@link #logFinished(TaskAttemptID, long, long, long, String, boolean)}
      */
+    @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
                                    long sortFinished, long finishTime, 
                                    String hostName){
+      logFinished(taskAttemptId, shuffleFinished, sortFinished, 
+                  finishTime, hostName, false);
+    }
+    
+    /**
+     * Log finished event of this task. 
+     * 
+     * @param taskAttemptId task attempt id
+     * @param shuffleFinished shuffle finish time
+     * @param sortFinished sort finish time
+     * @param finishTime finish time of task
+     * @param hostName host name where task attempt executed
+     * @param isCleanup Whether the attempt is cleanup or not 
+     */
+    public static void logFinished(TaskAttemptID taskAttemptId, 
+                                   long shuffleFinished, 
+                                   long sortFinished, long finishTime, 
+                                   String hostName, boolean isCleanup) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskAttemptId.getJobID()); 
+                                                   + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -897,48 +1030,95 @@ public class JobHistory {
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
                                      Keys.FINISH_TIME, Keys.HOSTNAME},
-                         new String[]{Values.REDUCE.name(),  taskAttemptId.getTaskID().toString(), 
-                                      taskAttemptId.toString(), Values.SUCCESS.name(), 
-                                      String.valueOf(shuffleFinished), String.valueOf(sortFinished),
+                         new String[]{isCleanup ? Values.CLEANUP.name() : 
+                                                  Values.REDUCE.name(),
+                                      taskAttemptId.getTaskID().toString(), 
+                                      taskAttemptId.toString(), 
+                                      Values.SUCCESS.name(), 
+                                      String.valueOf(shuffleFinished), 
+                                      String.valueOf(sortFinished),
                                       String.valueOf(finishTime), hostName}); 
         }
       }
     }
+    
     /**
      * Log failed reduce task attempt. 
      * @param taskAttemptId task attempt id
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
-     * @param error error message of the task. 
+     * @param error error message of the task.
+     * @deprecated Use 
+     * {@link #logFailed(TaskAttemptID, long, String, String, boolean)} 
      */
+    @Deprecated
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error){
+      logFailed(taskAttemptId, timestamp, hostName, error, false);
+    }
+    
+    /**
+     * Log failed reduce task attempt.
+     *  
+     * @param taskAttemptId task attempt id
+     * @param timestamp time stamp when task failed
+     * @param hostName host name of the task attempt.  
+     * @param error error message of the task. 
+     * @param isCleanup Whether the attempt is cleanup or not 
+     */
+    public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
+                                 String hostName, String error, 
+                                 boolean isCleanup) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskAttemptId.getJobID()); 
+                                                   + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-                         new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
-                                      Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
-                         new String[]{ Values.REDUCE.name(), taskAttemptId.getTaskID().toString(), 
-                                       taskAttemptId.toString(), Values.FAILED.name(), 
+                         new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
+                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                      Keys.FINISH_TIME, Keys.HOSTNAME,
+                                      Keys.ERROR },
+                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
+                                                   Values.REDUCE.name(), 
+                                       taskAttemptId.getTaskID().toString(), 
+                                       taskAttemptId.toString(), 
+                                       Values.FAILED.name(), 
                                        String.valueOf(timestamp), hostName, error }); 
         }
       }
     }
+    
+    /**
+     * Log killed reduce task attempt. 
+     * @param taskAttemptId task attempt id
+     * @param timestamp time stamp when task failed
+     * @param hostName host name of the task attempt.  
+     * @param error error message of the task.
+     * @deprecated Use 
+     * {@link #logKilled(TaskAttemptID, long, String, String, boolean)} 
+     */
+    @Deprecated
+    public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
+                                 String hostName, String error) {
+      logKilled(taskAttemptId, timestamp, hostName, error, false);
+    }
+    
     /**
      * Log killed reduce task attempt. 
+     * 
      * @param taskAttemptId task attempt id
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
+     * @param isCleanup Whether the attempt is cleanup or not 
      */
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
-                                 String hostName, String error){
+                                 String hostName, String error, 
+                                 boolean isCleanup) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + taskAttemptId.getJobID()); 
+                                                   + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -946,13 +1126,16 @@ public class JobHistory {
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME, 
                                       Keys.ERROR },
-                         new String[]{ Values.REDUCE.name(), taskAttemptId.getTaskID().toString(), 
-                                       taskAttemptId.toString(), Values.KILLED.name(), 
-                                       String.valueOf(timestamp), hostName, error }); 
+                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
+                                                   Values.REDUCE.name(),
+                                       taskAttemptId.getTaskID().toString(), 
+                                       taskAttemptId.toString(), 
+                                       Values.KILLED.name(), 
+                                       String.valueOf(timestamp), 
+                                       hostName, error }); 
         }
       }
     }
-
   }
 
   /**

+ 23 - 18
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -829,8 +829,7 @@ class JobInProgress {
       launchedCleanup = true;
       if (tip.isFirstAttempt(result.getTaskID())) {
         JobHistory.Task.logStarted(tip.getTIPId(), 
-          tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(),
-          System.currentTimeMillis(), tip.getSplitNodes());
+          Values.CLEANUP.name(), System.currentTimeMillis(), "");
       }
     }
     return result;
@@ -1541,19 +1540,24 @@ class JobInProgress {
                                status.getTaskTracker()).getHost()).toString();
     if (status.getIsMap()){
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-                                       taskTrackerName); 
+                                       taskTrackerName, tip.isCleanupTask()); 
       JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
-                                        taskTrackerName); 
+                                        taskTrackerName, tip.isCleanupTask()); 
       JobHistory.Task.logFinished(tip.getTIPId(), 
-                                  Values.MAP.name(), status.getFinishTime(),
+                                  tip.isCleanupTask() ? Values.CLEANUP.name() :
+                                  Values.MAP.name(), 
+                                  status.getFinishTime(),
                                   status.getCounters()); 
     }else{
       JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
-                                          taskTrackerName); 
+                                          taskTrackerName, 
+                                          tip.isCleanupTask()); 
       JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
                                            status.getSortFinishTime(), status.getFinishTime(), 
-                                           taskTrackerName); 
+                                           taskTrackerName,
+                                           tip.isCleanupTask()); 
       JobHistory.Task.logFinished(tip.getTIPId(), 
+                                  tip.isCleanupTask() ? Values.CLEANUP.name() :
                                   Values.REDUCE.name(), status.getFinishTime(),
                                   status.getCounters()); 
     }
@@ -1741,23 +1745,26 @@ class JobInProgress {
                                taskTrackerStatus.getHost()).toString();
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-                taskTrackerName);
+                taskTrackerName, tip.isCleanupTask());
       if (status.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.MapAttempt.logFailed(status.getTaskID(), System.currentTimeMillis(),
-                taskTrackerName, status.getDiagnosticInfo());
+                taskTrackerName, status.getDiagnosticInfo(), tip.isCleanupTask());
       } else {
         JobHistory.MapAttempt.logKilled(status.getTaskID(), System.currentTimeMillis(),
-                taskTrackerName, status.getDiagnosticInfo());
+                taskTrackerName, status.getDiagnosticInfo(),
+                tip.isCleanupTask());
       }
     } else {
       JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-                taskTrackerName);
+                taskTrackerName, tip.isCleanupTask());
       if (status.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.ReduceAttempt.logFailed(status.getTaskID(), System.currentTimeMillis(),
-                taskTrackerName, status.getDiagnosticInfo());
+                taskTrackerName, status.getDiagnosticInfo(), 
+                tip.isCleanupTask());
       } else {
         JobHistory.ReduceAttempt.logKilled(status.getTaskID(), System.currentTimeMillis(),
-                taskTrackerName, status.getDiagnosticInfo());
+                taskTrackerName, status.getDiagnosticInfo(), 
+                tip.isCleanupTask());
       }
     }
         
@@ -1801,16 +1808,13 @@ class JobInProgress {
       if (killJob) {
         LOG.info("Aborting job " + profile.getJobID());
         JobHistory.Task.logFailed(tip.getTIPId(), 
+                                  tip.isCleanupTask() ?
+                                    Values.CLEANUP.name() :
                                   tip.isMapTask() ? 
                                           Values.MAP.name() : 
                                           Values.REDUCE.name(),  
                                   System.currentTimeMillis(), 
                                   status.getDiagnosticInfo());
-        JobHistory.JobInfo.logFailed(profile.getJobID(), 
-                                     System.currentTimeMillis(), 
-                                     this.finishedMapTasks, 
-                                     this.finishedReduceTasks
-                                    );
         if (tip.isCleanupTask()) {
           // kill the other tip
           if (tip.isMapTask()) {
@@ -1857,6 +1861,7 @@ class JobInProgress {
                                                     null);
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(tip.getTIPId(), 
+                              tip.isCleanupTask() ? Values.CLEANUP.name() : 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               System.currentTimeMillis(), reason); 
   }

+ 1 - 1
src/webapps/job/analysejobhistory.jsp

@@ -61,7 +61,7 @@
         if (Values.MAP.name().equals(task.get(Keys.TASK_TYPE))) {
           mapTasks[mapIndex++] = attempt ; 
           avgMapTime += avgFinishTime;
-        } else { 
+        } else if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) { 
           reduceTasks[reduceIndex++] = attempt;
           avgShuffleTime += (attempt.getLong(Keys.SHUFFLE_FINISHED) - 
                              attempt.getLong(Keys.START_TIME));

+ 37 - 3
src/webapps/job/jobdetailshistory.jsp

@@ -40,16 +40,22 @@
 <%
     Map<String, JobHistory.Task> tasks = job.getAllTasks();
     int totalMaps = 0 ; 
-    int totalReduces = 0; 
+    int totalReduces = 0;
+    int totalCleanups = 0; 
     int numFailedMaps = 0; 
     int numKilledMaps = 0;
     int numFailedReduces = 0 ; 
     int numKilledReduces = 0;
+    int numFinishedCleanups = 0;
+    int numFailedCleanups = 0;
+    int numKilledCleanups = 0;
 	
     long mapStarted = 0 ; 
     long mapFinished = 0 ; 
     long reduceStarted = 0 ; 
-    long reduceFinished = 0; 
+    long reduceFinished = 0;
+    long cleanupStarted = 0;
+    long cleanupFinished = 0; 
         
     Map <String,String> allHosts = new TreeMap<String,String>();
     for (JobHistory.Task task : tasks.values()) {
@@ -71,7 +77,7 @@
           } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
             numKilledMaps++;
           }
-        } else {
+        } else if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) {
           if (reduceStarted==0||reduceStarted > startTime) {
             reduceStarted = startTime ; 
           }
@@ -84,6 +90,21 @@
           } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
             numKilledReduces++;
           }
+        } else if (Values.CLEANUP.name().equals(task.get(Keys.TASK_TYPE))) {
+          if (cleanupStarted==0||cleanupStarted > startTime) {
+            cleanupStarted = startTime ; 
+          }
+          if (cleanupFinished < finishTime) {
+            cleanupFinished = finishTime; 
+          }
+          totalCleanups++; 
+          if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFinishedCleanups++;
+          } else if (Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFailedCleanups++;
+          } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numKilledCleanups++;
+          }
         }
       }
     }
@@ -121,6 +142,19 @@
     <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, reduceStarted, 0) %></td>
     <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, reduceFinished, reduceStarted) %></td>
 </tr>
+<tr>
+<td>Cleanup</td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.CLEANUP.name() %>&status=all">
+        <%=totalCleanups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.CLEANUP.name() %>&status=<%=Values.SUCCESS %>">
+        <%=numFinishedCleanups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.CLEANUP.name() %>&status=<%=Values.FAILED %>">
+        <%=numFailedCleanups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.CLEANUP.name() %>&status=<%=Values.KILLED %>">
+        <%=numKilledCleanups%></a></td>  
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, cleanupStarted, 0) %></td>
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, cleanupFinished, cleanupStarted) %></td>
+</tr>
 </table>
 
 <br/>