Browse Source

Merge -r 702359:702360 from trunk onto 0.19 branch. Fixes HADOOP-4261.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@702361 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
0377047342
26 changed files with 570 additions and 185 deletions
  1. 5 0
      CHANGES.txt
  2. 2 4
      docs/mapred_tutorial.html
  3. 1 1
      docs/mapred_tutorial.pdf
  4. 2 4
      src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
  5. 1 1
      src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
  6. 35 0
      src/mapred/org/apache/hadoop/mapred/HistoryViewer.java
  7. 21 8
      src/mapred/org/apache/hadoop/mapred/JobClient.java
  8. 83 54
      src/mapred/org/apache/hadoop/mapred/JobHistory.java
  9. 208 79
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  10. 35 0
      src/mapred/org/apache/hadoop/mapred/JobStatus.java
  11. 8 1
      src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
  12. 47 5
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  13. 5 0
      src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
  14. 4 0
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  15. 1 5
      src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
  16. 5 1
      src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  17. 9 0
      src/mapred/org/apache/hadoop/mapred/RunningJob.java
  18. 14 0
      src/mapred/org/apache/hadoop/mapred/Task.java
  19. 17 4
      src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  20. 2 2
      src/test/org/apache/hadoop/mapred/MiniMRCluster.java
  21. 9 6
      src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
  22. 4 1
      src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
  23. 6 3
      src/webapps/job/jobdetails.jsp
  24. 34 0
      src/webapps/job/jobdetailshistory.jsp
  25. 5 2
      src/webapps/job/jobtasks.jsp
  26. 7 4
      src/webapps/job/taskdetails.jsp

+ 5 - 0
CHANGES.txt

@@ -805,6 +805,11 @@ Release 0.19.0 - Unreleased
     HADOOP-4163. Report FSErrors from map output fetch threads instead of
     HADOOP-4163. Report FSErrors from map output fetch threads instead of
     merely logging them. (Sharad Agarwal via cdouglas)
     merely logging them. (Sharad Agarwal via cdouglas)
 
 
+    HADOOP-4261. Adds a setup task for jobs. This is required so that we 
+    don't setup jobs that haven't been inited yet (since init could lead
+    to job failure). Only after the init has successfully happened do we 
+    launch the setupJob task. (Amareshwari Sriramadasu via ddas)
+
 Release 0.18.2 - Unreleased
 Release 0.18.2 - Unreleased
 
 
   BUG FIXES
   BUG FIXES

+ 2 - 4
docs/mapred_tutorial.html

@@ -2186,14 +2186,12 @@ document.write("Last Published: " + document.lastModified);
 <li>
 <li>
             Setup the job during initialization. For example, create
             Setup the job during initialization. For example, create
             the temporary output directory for the job during the
             the temporary output directory for the job during the
-            initialization of the job. The job client does the setup
-            for the job.
+            initialization of the job. 
           </li>
           </li>
           
           
 <li>
 <li>
             Cleanup the job after the job completion. For example, remove the
             Cleanup the job after the job completion. For example, remove the
-            temporary output directory after the job completion. A separate 
-            task does the cleanupJob at the end of the job.
+            temporary output directory after the job completion.
           </li>
           </li>
           
           
 <li>
 <li>

File diff suppressed because it is too large
+ 1 - 1
docs/mapred_tutorial.pdf


+ 2 - 4
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1594,13 +1594,11 @@
           <li>
           <li>
             Setup the job during initialization. For example, create
             Setup the job during initialization. For example, create
             the temporary output directory for the job during the
             the temporary output directory for the job during the
-            initialization of the job. The job client does the setup
-            for the job.
+            initialization of the job. 
           </li>
           </li>
           <li>
           <li>
             Cleanup the job after the job completion. For example, remove the
             Cleanup the job after the job completion. For example, remove the
-            temporary output directory after the job completion. A separate 
-            task does the cleanupJob at the end of the job.
+            temporary output directory after the job completion.
           </li>
           </li>
           <li>
           <li>
             Setup the task temporary output.
             Setup the task temporary output.

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java

@@ -58,7 +58,7 @@ class EagerTaskInitializationListener extends JobInProgressListener {
           LOG.error("Job initialization failed:\n" +
           LOG.error("Job initialization failed:\n" +
                     StringUtils.stringifyException(t));
                     StringUtils.stringifyException(t));
           if (job != null) {
           if (job != null) {
-            job.fail();
+            job.terminateJob(JobStatus.FAILED);
           }
           }
         }
         }
       }
       }

+ 35 - 0
src/mapred/org/apache/hadoop/mapred/HistoryViewer.java

@@ -93,6 +93,8 @@ class HistoryViewer {
     printJobDetails();
     printJobDetails();
     printTaskSummary();
     printTaskSummary();
     printJobAnalysis();
     printJobAnalysis();
+    printTasks("SETUP", "FAILED");
+    printTasks("SETUP", "KILLED");
     printTasks("MAP", "FAILED");
     printTasks("MAP", "FAILED");
     printTasks("MAP", "KILLED");
     printTasks("MAP", "KILLED");
     printTasks("REDUCE", "FAILED");
     printTasks("REDUCE", "FAILED");
@@ -100,9 +102,11 @@ class HistoryViewer {
     printTasks("CLEANUP", "FAILED");
     printTasks("CLEANUP", "FAILED");
     printTasks("CLEANUP", "KILLED");
     printTasks("CLEANUP", "KILLED");
     if (printAll) {
     if (printAll) {
+      printTasks("SETUP", "SUCCESS");
       printTasks("MAP", "SUCCESS");
       printTasks("MAP", "SUCCESS");
       printTasks("REDUCE", "SUCCESS");
       printTasks("REDUCE", "SUCCESS");
       printTasks("CLEANUP", "SUCCESS");
       printTasks("CLEANUP", "SUCCESS");
+      printAllTaskAttempts("SETUP");
       printAllTaskAttempts("MAP");
       printAllTaskAttempts("MAP");
       printAllTaskAttempts("REDUCE");
       printAllTaskAttempts("REDUCE");
       printAllTaskAttempts("CLEANUP");
       printAllTaskAttempts("CLEANUP");
@@ -219,6 +223,7 @@ class HistoryViewer {
     int totalMaps = 0; 
     int totalMaps = 0; 
     int totalReduces = 0; 
     int totalReduces = 0; 
     int totalCleanups = 0;
     int totalCleanups = 0;
+    int totalSetups = 0;
     int numFailedMaps = 0; 
     int numFailedMaps = 0; 
     int numKilledMaps = 0;
     int numKilledMaps = 0;
     int numFailedReduces = 0; 
     int numFailedReduces = 0; 
@@ -226,12 +231,17 @@ class HistoryViewer {
     int numFinishedCleanups = 0;
     int numFinishedCleanups = 0;
     int numFailedCleanups = 0;
     int numFailedCleanups = 0;
     int numKilledCleanups = 0;
     int numKilledCleanups = 0;
+    int numFinishedSetups = 0;
+    int numFailedSetups = 0;
+    int numKilledSetups = 0;
     long mapStarted = 0; 
     long mapStarted = 0; 
     long mapFinished = 0; 
     long mapFinished = 0; 
     long reduceStarted = 0; 
     long reduceStarted = 0; 
     long reduceFinished = 0; 
     long reduceFinished = 0; 
     long cleanupStarted = 0;
     long cleanupStarted = 0;
     long cleanupFinished = 0;
     long cleanupFinished = 0;
+    long setupStarted = 0;
+    long setupFinished = 0;
 
 
     Map <String, String> allHosts = new TreeMap<String, String>();
     Map <String, String> allHosts = new TreeMap<String, String>();
 
 
@@ -286,6 +296,23 @@ class HistoryViewer {
                                             attempt.get(Keys.TASK_STATUS))) {
                                             attempt.get(Keys.TASK_STATUS))) {
             numKilledCleanups++;
             numKilledCleanups++;
           }
           }
+        } else if (Values.SETUP.name().equals(task.get(Keys.TASK_TYPE))){
+          if (setupStarted==0||setupStarted > startTime) {
+            setupStarted = startTime; 
+          }
+          if (setupFinished < finishTime) {
+            setupFinished = finishTime; 
+          }
+          totalSetups++; 
+          if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFinishedSetups++;
+          } else if (Values.FAILED.name().equals(
+                                            attempt.get(Keys.TASK_STATUS))) {
+            numFailedSetups++;
+          } else if (Values.KILLED.name().equals(
+                                            attempt.get(Keys.TASK_STATUS))) {
+            numKilledSetups++;
+          }
         }
         }
       }
       }
     }
     }
@@ -296,6 +323,14 @@ class HistoryViewer {
     taskSummary.append("\nKind\tTotal\t");
     taskSummary.append("\nKind\tTotal\t");
     taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
     taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
     taskSummary.append("\n");
     taskSummary.append("\n");
+    taskSummary.append("\nSetup\t").append(totalSetups);
+    taskSummary.append("\t").append(numFinishedSetups);
+    taskSummary.append("\t\t").append(numFailedSetups);
+    taskSummary.append("\t").append(numKilledSetups);
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, setupStarted, 0));
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, setupFinished, setupStarted)); 
     taskSummary.append("\nMap\t").append(totalMaps);
     taskSummary.append("\nMap\t").append(totalMaps);
     taskSummary.append("\t").append(job.getInt(Keys.FINISHED_MAPS));
     taskSummary.append("\t").append(job.getInt(Keys.FINISHED_MAPS));
     taskSummary.append("\t\t").append(numFailedMaps);
     taskSummary.append("\t\t").append(numFailedMaps);

+ 21 - 8
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -261,6 +261,15 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       return status.cleanupProgress();
       return status.cleanupProgress();
     }
     }
 
 
+    /**
+     * A float between 0.0 and 1.0, indicating the % of setup work
+     * completed.
+     */
+    public float setupProgress() throws IOException {
+      ensureFreshStatus();
+      return status.setupProgress();
+    }
+
     /**
     /**
      * Returns immediately whether the whole job is done yet or not.
      * Returns immediately whether the whole job is done yet or not.
      */
      */
@@ -813,13 +822,6 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       out.close();
       out.close();
     }
     }
 
 
-    // skip doing setup if there are no maps for the job.
-    // because if there are no maps, job is considered completed and successful
-    if (splits.length != 0) {
-      // do setupJob
-      job.getOutputCommitter().setupJob(new JobContext(job));
-    }
-
     //
     //
     // Now, actually submit the job (using the submit name)
     // Now, actually submit the job (using the submit name)
     //
     //
@@ -1039,7 +1041,18 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
   public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
     return jobSubmitClient.getCleanupTaskReports(jobId);
     return jobSubmitClient.getCleanupTaskReports(jobId);
   }
   }
-  
+
+  /**
+   * Get the information of the current state of the setup tasks of a job.
+   * 
+   * @param jobId the job to query.
+   * @return the list of all of the setup tips.
+   * @throws IOException
+   */    
+  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
+    return jobSubmitClient.getSetupTaskReports(jobId);
+  }
+
   /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
   /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
   @Deprecated
   @Deprecated
   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {

+ 83 - 54
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -123,7 +123,7 @@ public class JobHistory {
    * most places in history file. 
    * most places in history file. 
    */
    */
   public static enum Values {
   public static enum Values {
-    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING
+    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
   }
   }
 
 
   // temp buffer for parsed dataa
   // temp buffer for parsed dataa
@@ -923,12 +923,14 @@ public class JobHistory {
     }
     }
     /**
     /**
      * Logs launch time of job. 
      * Logs launch time of job. 
+     * 
      * @param jobId job id, assigned by jobtracker. 
      * @param jobId job id, assigned by jobtracker. 
      * @param startTime start time of job. 
      * @param startTime start time of job. 
      * @param totalMaps total maps assigned by jobtracker. 
      * @param totalMaps total maps assigned by jobtracker. 
      * @param totalReduces total reduces. 
      * @param totalReduces total reduces. 
      */
      */
-    public static void logStarted(JobID jobId, long startTime, int totalMaps, int totalReduces){
+    public static void logInited(JobID jobId, long startTime, 
+                                 int totalMaps, int totalReduces) {
       if (!disableHistory){
       if (!disableHistory){
         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
@@ -940,10 +942,45 @@ public class JobHistory {
               new String[] {jobId.toString(), String.valueOf(startTime), 
               new String[] {jobId.toString(), String.valueOf(startTime), 
                             String.valueOf(totalMaps), 
                             String.valueOf(totalMaps), 
                             String.valueOf(totalReduces), 
                             String.valueOf(totalReduces), 
+                            Values.PREP.name()}); 
+        }
+      }
+    }
+    
+   /**
+     * Logs the job as RUNNING. 
+     *
+     * @param jobId job id, assigned by jobtracker. 
+     * @param startTime start time of job. 
+     * @param totalMaps total maps assigned by jobtracker. 
+     * @param totalReduces total reduces. 
+     * @deprecated Use {@link #logInited(JobID, long, int, int)} and 
+     * {@link #logStarted(JobID)}
+     */
+    @Deprecated
+    public static void logStarted(JobID jobId, long startTime, 
+                                  int totalMaps, int totalReduces) {
+      logStarted(jobId);
+    }
+    
+    /**
+     * Logs job as running 
+     * @param jobId job id, assigned by jobtracker. 
+     */
+    public static void logStarted(JobID jobId){
+      if (!disableHistory){
+        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
+        if (null != writer){
+          JobHistory.log(writer, RecordTypes.Job, 
+              new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
+              new String[] {jobId.toString(),  
                             Values.RUNNING.name()}); 
                             Values.RUNNING.name()}); 
         }
         }
       }
       }
     }
     }
+    
     /**
     /**
      * Log job finished. closes the job file in history. 
      * Log job finished. closes the job file in history. 
      * @param jobId job id, assigned by jobtracker. 
      * @param jobId job id, assigned by jobtracker. 
@@ -1197,12 +1234,11 @@ public class JobHistory {
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param hostName host name of the task attempt. 
      * @param hostName host name of the task attempt. 
      * @deprecated Use 
      * @deprecated Use 
-     *             {@link #logStarted(TaskAttemptID, long, String, int, 
-     *                                boolean)}
+     *             {@link #logStarted(TaskAttemptID, long, String, int, String)}
      */
      */
     @Deprecated
     @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
-      logStarted(taskAttemptId, startTime, hostName, -1, false);
+      logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
     }
     }
     
     
     /**
     /**
@@ -1212,11 +1248,11 @@ public class JobHistory {
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param trackerName name of the tracker executing the task attempt.
      * @param trackerName name of the tracker executing the task attempt.
      * @param httpPort http port of the task tracker executing the task attempt
      * @param httpPort http port of the task tracker executing the task attempt
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
      */
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
                                   String trackerName, int httpPort, 
                                   String trackerName, int httpPort, 
-                                  boolean isCleanup){
+                                  String taskType) {
       if (!disableHistory){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
                                                    + taskAttemptId.getJobID()); 
@@ -1226,8 +1262,7 @@ public class JobHistory {
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
                                      Keys.TRACKER_NAME, Keys.HTTP_PORT},
                                      Keys.TRACKER_NAME, Keys.HTTP_PORT},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.MAP.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
                                       String.valueOf(startTime), trackerName,
@@ -1242,13 +1277,12 @@ public class JobHistory {
      * @param finishTime finish time
      * @param finishTime finish time
      * @param hostName host name 
      * @param hostName host name 
      * @deprecated Use 
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, String, boolean, String, 
-     *                     Counters)}
+     * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
      */
      */
     @Deprecated
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
                                    String hostName){
                                    String hostName){
-      logFinished(taskAttemptId, finishTime, hostName, false, "", 
+      logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "", 
                   new Counters());
                   new Counters());
     }
     }
 
 
@@ -1258,14 +1292,15 @@ public class JobHistory {
      * @param taskAttemptId task attempt id 
      * @param taskAttemptId task attempt id 
      * @param finishTime finish time
      * @param finishTime finish time
      * @param hostName host name 
      * @param hostName host name 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      * @param stateString state string of the task attempt
      * @param stateString state string of the task attempt
      * @param counter counters of the task attempt
      * @param counter counters of the task attempt
      */
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
     public static void logFinished(TaskAttemptID taskAttemptId, 
                                    long finishTime, 
                                    long finishTime, 
                                    String hostName,
                                    String hostName,
-                                   boolean isCleanup, String stateString, 
+                                   String taskType,
+                                   String stateString, 
                                    Counters counter) {
                                    Counters counter) {
       if (!disableHistory){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
@@ -1277,8 +1312,7 @@ public class JobHistory {
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.STATE_STRING, Keys.COUNTERS},
                                      Keys.STATE_STRING, Keys.COUNTERS},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.MAP.name(), 
+                         new String[]{taskType, 
                                       taskAttemptId.getTaskID().toString(),
                                       taskAttemptId.getTaskID().toString(),
                                       taskAttemptId.toString(), 
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(),  
                                       Values.SUCCESS.name(),  
@@ -1296,13 +1330,13 @@ public class JobHistory {
      * @param hostName hostname of this task attempt.
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt.
      * @param error error message if any for this task attempt.
      * @deprecated Use
      * @deprecated Use
-     * {@link #logFailed(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logFailed(TaskAttemptID, long, String, String, String)} 
      */
      */
     @Deprecated
     @Deprecated
     public static void logFailed(TaskAttemptID taskAttemptId, 
     public static void logFailed(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, 
                                  long timestamp, String hostName, 
                                  String error) {
                                  String error) {
-      logFailed(taskAttemptId, timestamp, hostName, error, false);
+      logFailed(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
     }
     }
 
 
     /**
     /**
@@ -1312,11 +1346,11 @@ public class JobHistory {
      * @param timestamp timestamp
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
      * @param error error message if any for this task attempt. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
      */
     public static void logFailed(TaskAttemptID taskAttemptId, 
     public static void logFailed(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, 
                                  long timestamp, String hostName, 
-                                 String error, boolean isCleanup) {
+                                 String error, String taskType) {
       if (!disableHistory){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
                                                    + taskAttemptId.getJobID()); 
@@ -1326,8 +1360,7 @@ public class JobHistory {
                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 
                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 
                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
-                         new String[]{ isCleanup ? Values.CLEANUP.name() :
-                                                   Values.MAP.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(),
                                        taskAttemptId.getTaskID().toString(),
                                        taskAttemptId.toString(), 
                                        taskAttemptId.toString(), 
                                        Values.FAILED.name(),
                                        Values.FAILED.name(),
@@ -1344,12 +1377,12 @@ public class JobHistory {
      * @param hostName hostname of this task attempt.
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
      * @param error error message if any for this task attempt. 
      * @deprecated Use 
      * @deprecated Use 
-     * {@link #logKilled(TaskAttemptID, long, String, String, boolean)}
+     * {@link #logKilled(TaskAttemptID, long, String, String, String)}
      */
      */
     @Deprecated
     @Deprecated
     public static void logKilled(TaskAttemptID taskAttemptId, 
     public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, String error){
                                  long timestamp, String hostName, String error){
-      logKilled(taskAttemptId, timestamp, hostName, error, false);
+      logKilled(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
     } 
     } 
     
     
     /**
     /**
@@ -1359,11 +1392,11 @@ public class JobHistory {
      * @param timestamp timestamp
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
      * @param error error message if any for this task attempt. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
      */
     public static void logKilled(TaskAttemptID taskAttemptId, 
     public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName,
                                  long timestamp, String hostName,
-                                 String error, boolean isCleanup){
+                                 String error, String taskType) {
       if (!disableHistory){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
                                                    + taskAttemptId.getJobID()); 
@@ -1374,8 +1407,7 @@ public class JobHistory {
                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME,
                                     Keys.FINISH_TIME, Keys.HOSTNAME,
                                     Keys.ERROR},
                                     Keys.ERROR},
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.MAP.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(),
                                        taskAttemptId.toString(),
                                        Values.KILLED.name(),
                                        Values.KILLED.name(),
@@ -1396,12 +1428,12 @@ public class JobHistory {
      * @param startTime start time
      * @param startTime start time
      * @param hostName host name 
      * @param hostName host name 
      * @deprecated Use 
      * @deprecated Use 
-     * {@link #logStarted(TaskAttemptID, long, String, int, boolean)}
+     * {@link #logStarted(TaskAttemptID, long, String, int, String)}
      */
      */
     @Deprecated
     @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, 
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String hostName){
                                   long startTime, String hostName){
-      logStarted(taskAttemptId, startTime, hostName, -1, false);
+      logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
     }
     }
     
     
     /**
     /**
@@ -1411,11 +1443,12 @@ public class JobHistory {
      * @param startTime start time
      * @param startTime start time
      * @param trackerName tracker name 
      * @param trackerName tracker name 
      * @param httpPort the http port of the tracker executing the task attempt
      * @param httpPort the http port of the tracker executing the task attempt
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      */
      */
     public static void logStarted(TaskAttemptID taskAttemptId, 
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String trackerName, 
                                   long startTime, String trackerName, 
-                                  int httpPort, boolean isCleanup) {
+                                  int httpPort, 
+                                  String taskType) {
       if (!disableHistory){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
                                                    + taskAttemptId.getJobID()); 
@@ -1425,8 +1458,7 @@ public class JobHistory {
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
                                       Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
                                       Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
                                       Keys.TRACKER_NAME, Keys.HTTP_PORT},
                                       Keys.TRACKER_NAME, Keys.HTTP_PORT},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.REDUCE.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
                                       String.valueOf(startTime), trackerName,
@@ -1443,15 +1475,15 @@ public class JobHistory {
      * @param finishTime finish time of task
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
      * @param hostName host name where task attempt executed
      * @deprecated Use 
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, long, long, String, boolean, 
-     *                     String, Counters)}
+     * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)}
      */
      */
     @Deprecated
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
     public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
                                    long sortFinished, long finishTime, 
                                    long sortFinished, long finishTime, 
                                    String hostName){
                                    String hostName){
       logFinished(taskAttemptId, shuffleFinished, sortFinished, 
       logFinished(taskAttemptId, shuffleFinished, sortFinished, 
-                  finishTime, hostName, false, "", new Counters());
+                  finishTime, hostName, Values.REDUCE.name(),
+                  "", new Counters());
     }
     }
     
     
     /**
     /**
@@ -1462,14 +1494,14 @@ public class JobHistory {
      * @param sortFinished sort finish time
      * @param sortFinished sort finish time
      * @param finishTime finish time of task
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
      * @param hostName host name where task attempt executed
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      * @param stateString the state string of the attempt
      * @param stateString the state string of the attempt
      * @param counter counters of the attempt
      * @param counter counters of the attempt
      */
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
     public static void logFinished(TaskAttemptID taskAttemptId, 
                                    long shuffleFinished, 
                                    long shuffleFinished, 
                                    long sortFinished, long finishTime, 
                                    long sortFinished, long finishTime, 
-                                   String hostName, boolean isCleanup,
+                                   String hostName, String taskType,
                                    String stateString, Counters counter) {
                                    String stateString, Counters counter) {
       if (!disableHistory){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
@@ -1482,8 +1514,7 @@ public class JobHistory {
                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.STATE_STRING, Keys.COUNTERS},
                                      Keys.STATE_STRING, Keys.COUNTERS},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.REDUCE.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(), 
                                       Values.SUCCESS.name(), 
@@ -1503,12 +1534,12 @@ public class JobHistory {
      * @param hostName host name of the task attempt.  
      * @param hostName host name of the task attempt.  
      * @param error error message of the task.
      * @param error error message of the task.
      * @deprecated Use 
      * @deprecated Use 
-     * {@link #logFailed(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logFailed(TaskAttemptID, long, String, String, String)} 
      */
      */
     @Deprecated
     @Deprecated
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error){
                                  String hostName, String error){
-      logFailed(taskAttemptId, timestamp, hostName, error, false);
+      logFailed(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
     }
     }
     
     
     /**
     /**
@@ -1518,11 +1549,11 @@ public class JobHistory {
      * @param timestamp time stamp when task failed
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
      * @param error error message of the task. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      */
      */
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error, 
                                  String hostName, String error, 
-                                 boolean isCleanup) {
+                                 String taskType) {
       if (!disableHistory){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
                                                    + taskAttemptId.getJobID()); 
@@ -1533,8 +1564,7 @@ public class JobHistory {
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME,
                                       Keys.FINISH_TIME, Keys.HOSTNAME,
                                       Keys.ERROR },
                                       Keys.ERROR },
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.REDUCE.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(), 
                                        taskAttemptId.toString(), 
                                        Values.FAILED.name(), 
                                        Values.FAILED.name(), 
@@ -1550,12 +1580,12 @@ public class JobHistory {
      * @param hostName host name of the task attempt.  
      * @param hostName host name of the task attempt.  
      * @param error error message of the task.
      * @param error error message of the task.
      * @deprecated Use 
      * @deprecated Use 
-     * {@link #logKilled(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logKilled(TaskAttemptID, long, String, String, String)} 
      */
      */
     @Deprecated
     @Deprecated
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error) {
                                  String hostName, String error) {
-      logKilled(taskAttemptId, timestamp, hostName, error, false);
+      logKilled(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
     }
     }
     
     
     /**
     /**
@@ -1565,11 +1595,11 @@ public class JobHistory {
      * @param timestamp time stamp when task failed
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
      * @param error error message of the task. 
-     * @param isCleanup Whether the attempt is cleanup or not 
-     */
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
+    */
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error, 
                                  String hostName, String error, 
-                                 boolean isCleanup) {
+                                 String taskType) {
       if (!disableHistory){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
                                                    + taskAttemptId.getJobID()); 
@@ -1580,8 +1610,7 @@ public class JobHistory {
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME, 
                                       Keys.ERROR },
                                       Keys.ERROR },
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.REDUCE.name(),
+                         new String[]{ taskType,
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(), 
                                        taskAttemptId.toString(), 
                                        Values.KILLED.name(), 
                                        Values.KILLED.name(), 

+ 208 - 79
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -63,6 +63,7 @@ class JobInProgress {
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
   TaskInProgress cleanup[] = new TaskInProgress[0];
   TaskInProgress cleanup[] = new TaskInProgress[0];
+  TaskInProgress setup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numMapTasks = 0;
   int numReduceTasks = 0;
   int numReduceTasks = 0;
   
   
@@ -83,6 +84,7 @@ class JobInProgress {
   int failedMapTIPs = 0;
   int failedMapTIPs = 0;
   int failedReduceTIPs = 0;
   int failedReduceTIPs = 0;
   private volatile boolean launchedCleanup = false;
   private volatile boolean launchedCleanup = false;
+  private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
   private volatile boolean jobFailed = false;
 
 
@@ -382,12 +384,13 @@ class JobInProgress {
       // Finished time need to be setted here to prevent this job to be retired
       // Finished time need to be setted here to prevent this job to be retired
       // from the job tracker jobs at the next retire iteration.
       // from the job tracker jobs at the next retire iteration.
       this.finishTime = this.launchTime;
       this.finishTime = this.launchTime;
+      status.setSetupProgress(1.0f);
       status.setMapProgress(1.0f);
       status.setMapProgress(1.0f);
       status.setReduceProgress(1.0f);
       status.setReduceProgress(1.0f);
       status.setCleanupProgress(1.0f);
       status.setCleanupProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
       status.setRunState(JobStatus.SUCCEEDED);
       tasksInited.set(true);
       tasksInited.set(true);
-      JobHistory.JobInfo.logStarted(profile.getJobID(), 
+      JobHistory.JobInfo.logInited(profile.getJobID(), 
                                     this.launchTime, 0, 0);
                                     this.launchTime, 0, 0);
       JobHistory.JobInfo.logFinished(profile.getJobID(), 
       JobHistory.JobInfo.logFinished(profile.getJobID(), 
                                      this.finishTime, 0, 0, 0, 0,
                                      this.finishTime, 0, 0, 0, 0,
@@ -422,12 +425,22 @@ class JobInProgress {
                        numReduceTasks, jobtracker, conf, this);
                        numReduceTasks, jobtracker, conf, this);
     cleanup[1].setCleanupTask();
     cleanup[1].setCleanupTask();
 
 
-    this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, 0.0f, 
-                                          JobStatus.RUNNING, status.getJobPriority());
+    // create two setup tips, one map and one reduce.
+    setup = new TaskInProgress[2];
+    // setup map tip. This map is doesn't use split. 
+    // Just assign splits[0]
+    setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+            jobtracker, conf, this, numMapTasks + 1 );
+    setup[0].setSetupTask();
+
+    // setup reduce tip.
+    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                       numReduceTasks + 1, jobtracker, conf, this);
+    setup[1].setSetupTask();
+    
     tasksInited.set(true);
     tasksInited.set(true);
-        
-    JobHistory.JobInfo.logStarted(profile.getJobID(), this.launchTime, 
-                                  numMapTasks, numReduceTasks);
+    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
+                                 numMapTasks, numReduceTasks);
   }
   }
 
 
   /////////////////////////////////////////////////////
   /////////////////////////////////////////////////////
@@ -532,6 +545,14 @@ class JobInProgress {
     return cleanup;
     return cleanup;
   }
   }
   
   
+  /**
+   * Get the list of setup tasks
+   * @return the array of setup tasks for the job
+   */
+  TaskInProgress[] getSetupTasks() {
+    return setup;
+  }
+  
   /**
   /**
    * Get the list of reduce tasks
    * Get the list of reduce tasks
    * @return the raw array of reduce tasks for this job
    * @return the raw array of reduce tasks for this job
@@ -611,6 +632,21 @@ class JobInProgress {
     return results;
     return results;
   }
   }
 
 
+  /**
+   * Return a vector of setup TaskInProgress objects
+   */
+  public synchronized Vector<TaskInProgress> reportSetupTIPs(
+                                               boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+    for (int i = 0; i < setup.length; i++) {
+      if (setup[i].isComplete() == shouldBeComplete) {
+        results.add(setup[i]);
+      }
+    }
+    return results;
+  }
+
   ////////////////////////////////////////////////////
   ////////////////////////////////////////////////////
   // Status update methods
   // Status update methods
   ////////////////////////////////////////////////////
   ////////////////////////////////////////////////////
@@ -655,7 +691,9 @@ class JobInProgress {
                                             taskCompletionEventTracker, 
                                             taskCompletionEventTracker, 
                                             taskid,
                                             taskid,
                                             tip.idWithinJob(),
                                             tip.idWithinJob(),
-                                            status.getIsMap(),
+                                            status.getIsMap() &&
+                                            !tip.isCleanupTask() &&
+                                            !tip.isSetupTask(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                             httpTaskLogLocation 
                                            );
                                            );
@@ -698,7 +736,9 @@ class JobInProgress {
         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
                                             taskid,
                                             taskid,
                                             tip.idWithinJob(),
                                             tip.idWithinJob(),
-                                            status.getIsMap(),
+                                            status.getIsMap() &&
+                                            !tip.isCleanupTask() &&
+                                            !tip.isSetupTask(),
                                             taskCompletionStatus, 
                                             taskCompletionStatus, 
                                             httpTaskLogLocation
                                             httpTaskLogLocation
                                            );
                                            );
@@ -727,7 +767,7 @@ class JobInProgress {
                  oldProgress + " to " + tip.getProgress());
                  oldProgress + " to " + tip.getProgress());
     }
     }
     
     
-    if (!tip.isCleanupTask()) {
+    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
       if (tip.isMapTask()) {
         if (maps.length == 0) {
         if (maps.length == 0) {
@@ -860,11 +900,7 @@ class JobInProgress {
     // Now launch the cleanupTask
     // Now launch the cleanupTask
     Task result = tip.getTaskToRun(tts.getTrackerName());
     Task result = tip.getTaskToRun(tts.getTrackerName());
     if (result != null) {
     if (result != null) {
-      launchedCleanup = true;
-      if (tip.isFirstAttempt(result.getTaskID())) {
-        JobHistory.Task.logStarted(tip.getTIPId(), 
-          Values.CLEANUP.name(), System.currentTimeMillis(), "");
-      }
+      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
     }
     }
     return result;
     return result;
   }
   }
@@ -878,8 +914,12 @@ class JobInProgress {
    * @return true/false
    * @return true/false
    */
    */
   private synchronized boolean canLaunchCleanupTask() {
   private synchronized boolean canLaunchCleanupTask() {
+    if (!tasksInited.get()) {
+      return false;
+    }
     // check if the job is running
     // check if the job is running
-    if (status.getRunState() != JobStatus.RUNNING) {
+    if (status.getRunState() != JobStatus.RUNNING &&
+        status.getRunState() != JobStatus.PREP) {
       return false;
       return false;
     }
     }
     // check if cleanup task has been launched already. 
     // check if cleanup task has been launched already. 
@@ -899,7 +939,62 @@ class JobInProgress {
     }
     }
     return launchCleanupTask;
     return launchCleanupTask;
   }
   }
+
+  /**
+   * Return a SetupTask, if appropriate, to run on the given tasktracker
+   * 
+   */
+  public synchronized Task obtainSetupTask(TaskTrackerStatus tts, 
+                                             int clusterSize, 
+                                             int numUniqueHosts,
+                                             boolean isMapSlot
+                                            ) throws IOException {
+    if (!canLaunchSetupTask()) {
+      return null;
+    }
+    
+    String taskTracker = tts.getTrackerName();
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return null;
+    }
+    
+    List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
+    if (isMapSlot) {
+      setupTaskList.add(setup[0]);
+    } else {
+      setupTaskList.add(setup[1]);
+    }
+    TaskInProgress tip = findTaskFromList(setupTaskList,
+                           tts, numUniqueHosts, false);
+    if (tip == null) {
+      return null;
+    }
+    
+    // Now launch the setupTask
+    Task result = tip.getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+    }
+    return result;
+  }
+  
+  /**
+   * Check whether setup task can be launched for the job.
+   * 
+   * Setup task can be launched after the tasks are inited
+   * and Job is in PREP state
+   * and if it is not already launched
+   * or job is not Killed/Failed
+   * @return true/false
+   */
+  private synchronized boolean canLaunchSetupTask() {
+    return (tasksInited.get() && status.getRunState() == JobStatus.PREP && 
+           !launchedSetup && !jobKilled && !jobFailed);
+  }
   
   
+
   /**
   /**
    * Return a ReduceTask, if appropriate, to run on the given tasktracker.
    * Return a ReduceTask, if appropriate, to run on the given tasktracker.
    * We don't have cache-sensitivity for reduce tasks, as they
    * We don't have cache-sensitivity for reduce tasks, as they
@@ -955,8 +1050,14 @@ class JobInProgress {
                                         boolean isScheduled) {
                                         boolean isScheduled) {
     // keeping the earlier ordering intact
     // keeping the earlier ordering intact
     String name;
     String name;
-    Enum counter;
-    if (tip.isMapTask()) {
+    Enum counter = null;
+    if (tip.isSetupTask()) {
+      launchedSetup = true;
+      name = Values.SETUP.name();
+    } else if (tip.isCleanupTask()) {
+      launchedCleanup = true;
+      name = Values.CLEANUP.name();
+    } else if (tip.isMapTask()) {
       ++runningMapTasks;
       ++runningMapTasks;
       name = Values.MAP.name();
       name = Values.MAP.name();
       counter = Counter.TOTAL_LAUNCHED_MAPS;
       counter = Counter.TOTAL_LAUNCHED_MAPS;
@@ -975,7 +1076,9 @@ class JobInProgress {
       JobHistory.Task.logStarted(tip.getTIPId(), name,
       JobHistory.Task.logStarted(tip.getTIPId(), name,
                                  tip.getExecStartTime(), "");
                                  tip.getExecStartTime(), "");
     }
     }
-    jobCounters.incrCounter(counter, 1);
+    if (!tip.isSetupTask() && !tip.isCleanupTask()) {
+      jobCounters.incrCounter(counter, 1);
+    }
     
     
     // Make an entry in the tip if the attempt is not scheduled i.e externally
     // Make an entry in the tip if the attempt is not scheduled i.e externally
     // added
     // added
@@ -994,7 +1097,7 @@ class JobInProgress {
     //
     //
     // So to simplify, increment the data locality counter whenever there is 
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
     // data locality.
-    if (tip.isMapTask()) {
+    if (tip.isMapTask() && !tip.isSetupTask() && !tip.isCleanupTask()) {
       // increment the data locality counter for maps
       // increment the data locality counter for maps
       Node tracker = jobtracker.getNode(tts.getHost());
       Node tracker = jobtracker.getNode(tts.getHost());
       int level = this.maxLevel;
       int level = this.maxLevel;
@@ -1648,65 +1751,45 @@ class JobInProgress {
     TaskTrackerStatus ttStatus = 
     TaskTrackerStatus ttStatus = 
       this.jobtracker.getTaskTracker(status.getTaskTracker());
       this.jobtracker.getTaskTracker(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
+    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isSetupTask() ? Values.SETUP.name() :
+                      tip.isMapTask() ? Values.MAP.name() : 
+                      Values.REDUCE.name();
     if (status.getIsMap()){
     if (status.getIsMap()){
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                                        status.getTaskTracker(), 
                                        status.getTaskTracker(), 
                                        ttStatus.getHttpPort(), 
                                        ttStatus.getHttpPort(), 
-                                       tip.isCleanupTask()); 
+                                       taskType); 
       JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
       JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
-                                        trackerHostname, tip.isCleanupTask(),
+                                        trackerHostname, taskType,
                                         status.getStateString(), 
                                         status.getStateString(), 
                                         status.getCounters()); 
                                         status.getCounters()); 
-      JobHistory.Task.logFinished(tip.getTIPId(), 
-                                  tip.isCleanupTask() ? Values.CLEANUP.name() :
-                                  Values.MAP.name(), 
-                                  tip.getExecFinishTime(),
-                                  status.getCounters()); 
     }else{
     }else{
       JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
       JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
                                           status.getTaskTracker(),
                                           status.getTaskTracker(),
                                           ttStatus.getHttpPort(), 
                                           ttStatus.getHttpPort(), 
-                                          tip.isCleanupTask()); 
+                                          taskType); 
       JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
       JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
                                            status.getSortFinishTime(), status.getFinishTime(), 
                                            status.getSortFinishTime(), status.getFinishTime(), 
-                                           trackerHostname, tip.isCleanupTask(), 
+                                           trackerHostname, 
+                                           taskType,
                                            status.getStateString(), 
                                            status.getStateString(), 
                                            status.getCounters()); 
                                            status.getCounters()); 
-      JobHistory.Task.logFinished(tip.getTIPId(), 
-                                  tip.isCleanupTask() ? Values.CLEANUP.name() :
-                                  Values.REDUCE.name(), tip.getExecFinishTime(),
-                                  status.getCounters()); 
     }
     }
+    JobHistory.Task.logFinished(tip.getTIPId(), 
+                                taskType,
+                                tip.getExecFinishTime(),
+                                status.getCounters()); 
         
         
     int newNumAttempts = tip.getActiveTasks().size();
     int newNumAttempts = tip.getActiveTasks().size();
-    if (!tip.isCleanupTask()) {
-      if (tip.isMapTask()) {
-        runningMapTasks -= 1;
-        // check if this was a sepculative task
-        if (oldNumAttempts > 1) {
-          speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
-        }
-        finishedMapTasks += 1;
-        metrics.completeMap(taskid);
-        // remove the completed map from the resp running caches
-        retireMap(tip);
-        if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
-          this.status.setMapProgress(1.0f);
-        }
-      } else {
-        runningReduceTasks -= 1;
-        if (oldNumAttempts > 1) {
-          speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
-        }
-        finishedReduceTasks += 1;
-        metrics.completeReduce(taskid);
-        // remove the completed reduces from the running reducers set
-        retireReduce(tip);
-        if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
-          this.status.setReduceProgress(1.0f);
-        }
-      }
-    } else {
+    if (tip.isSetupTask()) {
+      // setup task has finished. kill the extra setup tip
+      killSetupTip(!tip.isMapTask());
+      // Job can start running now.
+      this.status.setSetupProgress(1.0f);
+      this.status.setRunState(JobStatus.RUNNING);
+      JobHistory.JobInfo.logStarted(profile.getJobID());
+    } else if (tip.isCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
       if (tip.isMapTask()) {
         // kill the reduce tip
         // kill the reduce tip
@@ -1730,7 +1813,31 @@ class JobInProgress {
       // The job has been killed/failed/successful
       // The job has been killed/failed/successful
       // JobTracker should cleanup this task
       // JobTracker should cleanup this task
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
-      return false;
+    } else if (tip.isMapTask()) {
+      runningMapTasks -= 1;
+      // check if this was a sepculative task
+      if (oldNumAttempts > 1) {
+        speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
+      }
+      finishedMapTasks += 1;
+      metrics.completeMap(taskid);
+      // remove the completed map from the resp running caches
+      retireMap(tip);
+      if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
+        this.status.setMapProgress(1.0f);
+      }
+    } else {
+      runningReduceTasks -= 1;
+      if (oldNumAttempts > 1) {
+        speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
+      }
+      finishedReduceTasks += 1;
+      metrics.completeReduce(taskid);
+      // remove the completed reduces from the running reducers set
+      retireReduce(tip);
+      if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
+        this.status.setReduceProgress(1.0f);
+      }
     }
     }
     
     
     return true;
     return true;
@@ -1764,7 +1871,7 @@ class JobInProgress {
     }
     }
   }
   }
   
   
-  private synchronized void terminateJob(int jobTerminationState) {
+  synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
         (status.getRunState() == JobStatus.PREP)) {
       if (jobTerminationState == JobStatus.FAILED) {
       if (jobTerminationState == JobStatus.FAILED) {
@@ -1859,6 +1966,8 @@ class JobInProgress {
     if (wasRunning && !isRunning) {
     if (wasRunning && !isRunning) {
       if (tip.isCleanupTask()) {
       if (tip.isCleanupTask()) {
         launchedCleanup = false;
         launchedCleanup = false;
+      } else if (tip.isSetupTask()) {
+        launchedSetup = false;
       } else if (tip.isMapTask()) {
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
         runningMapTasks -= 1;
         // remove from the running queue and put it in the non-running cache
         // remove from the running queue and put it in the non-running cache
@@ -1896,36 +2005,41 @@ class JobInProgress {
     // update job history
     // update job history
     String taskTrackerName = taskTrackerStatus.getHost();
     String taskTrackerName = taskTrackerStatus.getHost();
     long finishTime = status.getFinishTime();
     long finishTime = status.getFinishTime();
+    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isSetupTask() ? Values.SETUP.name() :
+                      tip.isMapTask() ? Values.MAP.name() : 
+                      Values.REDUCE.name();
     if (status.getIsMap()) {
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
           status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
           status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          tip.isCleanupTask());
+          taskType);
       if (status.getRunState() == TaskStatus.State.FAILED) {
       if (status.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
         JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), tip.isCleanupTask());
+                taskTrackerName, status.getDiagnosticInfo(), 
+                taskType);
       } else {
       } else {
         JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
         JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(),
                 taskTrackerName, status.getDiagnosticInfo(),
-                tip.isCleanupTask());
+                taskType);
       }
       }
     } else {
     } else {
       JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
       JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
           status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
           status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          tip.isCleanupTask());
+          taskType);
       if (status.getRunState() == TaskStatus.State.FAILED) {
       if (status.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
         JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), 
                 taskTrackerName, status.getDiagnosticInfo(), 
-                tip.isCleanupTask());
+                taskType);
       } else {
       } else {
         JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
         JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), 
                 taskTrackerName, status.getDiagnosticInfo(), 
-                tip.isCleanupTask());
+                taskType);
       }
       }
     }
     }
         
         
     // After this, try to assign tasks with the one after this, so that
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
     // the failed task goes to the end of the list.
-    if (!tip.isCleanupTask()) {
+    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
       if (tip.isMapTask()) {
       if (tip.isMapTask()) {
         failedMapTasks++;
         failedMapTasks++;
       } else {
       } else {
@@ -1955,7 +2069,7 @@ class JobInProgress {
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
       // 'reduceFailuresPercent' of reduce tasks to fail
       //
       //
-      boolean killJob = tip.isCleanupTask() ? true :
+      boolean killJob = tip.isCleanupTask() || tip.isSetupTask() ? true :
                         tip.isMapTask() ? 
                         tip.isMapTask() ? 
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
@@ -1963,12 +2077,8 @@ class JobInProgress {
       if (killJob) {
       if (killJob) {
         LOG.info("Aborting job " + profile.getJobID());
         LOG.info("Aborting job " + profile.getJobID());
         JobHistory.Task.logFailed(tip.getTIPId(), 
         JobHistory.Task.logFailed(tip.getTIPId(), 
-                                  tip.isCleanupTask() ?
-                                    Values.CLEANUP.name() :
-                                  tip.isMapTask() ? 
-                                          Values.MAP.name() : 
-                                          Values.REDUCE.name(),  
-                                          status.getFinishTime(), 
+                                  taskType,  
+                                  status.getFinishTime(), 
                                   status.getDiagnosticInfo());
                                   status.getDiagnosticInfo());
         if (tip.isCleanupTask()) {
         if (tip.isCleanupTask()) {
           // kill the other tip
           // kill the other tip
@@ -1979,6 +2089,10 @@ class JobInProgress {
           }
           }
           terminateJob(JobStatus.FAILED);
           terminateJob(JobStatus.FAILED);
         } else {
         } else {
+          if (tip.isSetupTask()) {
+            // kill the other tip
+            killSetupTip(!tip.isMapTask());
+          }
           fail();
           fail();
         }
         }
       }
       }
@@ -1986,7 +2100,7 @@ class JobInProgress {
       //
       //
       // Update the counters
       // Update the counters
       //
       //
-      if (!tip.isCleanupTask()) {
+      if (!tip.isCleanupTask() && !tip.isSetupTask()) {
         if (tip.isMapTask()) {
         if (tip.isMapTask()) {
           jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
           jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
         } else {
         } else {
@@ -1996,6 +2110,14 @@ class JobInProgress {
     }
     }
   }
   }
 
 
+  void killSetupTip(boolean isMap) {
+    if (isMap) {
+      setup[0].kill();
+    } else {
+      setup[1].kill();
+    }
+  }
+
   /**
   /**
    * Fail a task with a given reason, but without a status object.
    * Fail a task with a given reason, but without a status object.
    * @param tip The task's tip
    * @param tip The task's tip
@@ -2018,6 +2140,7 @@ class JobInProgress {
     updateTaskStatus(tip, status, metrics);
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(tip.getTIPId(), 
     JobHistory.Task.logFailed(tip.getTIPId(), 
                               tip.isCleanupTask() ? Values.CLEANUP.name() : 
                               tip.isCleanupTask() ? Values.CLEANUP.name() : 
+                              tip.isSetupTask() ? Values.SETUP.name() : 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               tip.getExecFinishTime(), reason, taskid); 
                               tip.getExecFinishTime(), reason, taskid); 
   }
   }
@@ -2072,18 +2195,24 @@ class JobInProgress {
    */
    */
   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
     if (tipid.isMap()) {
     if (tipid.isMap()) {
-      if (tipid.getId() == maps.length) { // cleanup map tip
+      if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
         return cleanup[0]; 
         return cleanup[0]; 
       }
       }
+      if (tipid.equals(setup[0].getTIPId())) { //setup map tip
+        return setup[0];
+      }
       for (int i = 0; i < maps.length; i++) {
       for (int i = 0; i < maps.length; i++) {
         if (tipid.equals(maps[i].getTIPId())){
         if (tipid.equals(maps[i].getTIPId())){
           return maps[i];
           return maps[i];
         }
         }
       }
       }
     } else {
     } else {
-      if (tipid.getId() == reduces.length) { // cleanup reduce tip
+      if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
         return cleanup[1]; 
         return cleanup[1]; 
       }
       }
+      if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
+        return setup[1];
+      }
       for (int i = 0; i < reduces.length; i++) {
       for (int i = 0; i < reduces.length; i++) {
         if (tipid.equals(reduces[i].getTIPId())){
         if (tipid.equals(reduces[i].getTIPId())){
           return reduces[i];
           return reduces[i];

+ 35 - 0
src/mapred/org/apache/hadoop/mapred/JobStatus.java

@@ -52,6 +52,7 @@ public class JobStatus implements Writable {
   private float mapProgress;
   private float mapProgress;
   private float reduceProgress;
   private float reduceProgress;
   private float cleanupProgress;
   private float cleanupProgress;
+  private float setupProgress;
   private int runState;
   private int runState;
   private long startTime;
   private long startTime;
   private String user;
   private String user;
@@ -99,7 +100,25 @@ public class JobStatus implements Writable {
    */
    */
    public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
    public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
                       float cleanupProgress, int runState, JobPriority jp) {
                       float cleanupProgress, int runState, JobPriority jp) {
+     this(jobid, 0.0f, mapProgress, reduceProgress, 
+          cleanupProgress, runState, jp);
+   }
+   
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param setupProgress The progress made on the setup
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on the cleanup
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+   public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+                    float reduceProgress, float cleanupProgress, 
+                    int runState, JobPriority jp) {
      this.jobid = jobid;
      this.jobid = jobid;
+     this.setupProgress = setupProgress;
      this.mapProgress = mapProgress;
      this.mapProgress = mapProgress;
      this.reduceProgress = reduceProgress;
      this.reduceProgress = reduceProgress;
      this.cleanupProgress = cleanupProgress;
      this.cleanupProgress = cleanupProgress;
@@ -110,6 +129,7 @@ public class JobStatus implements Writable {
      }
      }
      priority = jp;
      priority = jp;
    }
    }
+   
   /**
   /**
    * @deprecated use getJobID instead
    * @deprecated use getJobID instead
    */
    */
@@ -147,6 +167,19 @@ public class JobStatus implements Writable {
     this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
     this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
   }
   }
 
 
+  /**
+   * @return Percentage of progress in setup 
+   */
+  public synchronized float setupProgress() { return setupProgress; }
+    
+  /**
+   * Sets the setup progress of this job
+   * @param p The value of setup progress to set to
+   */
+  synchronized void setSetupProgress(float p) { 
+    this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+
   /**
   /**
    * @return Percentage of progress in reduce 
    * @return Percentage of progress in reduce 
    */
    */
@@ -232,6 +265,7 @@ public class JobStatus implements Writable {
   ///////////////////////////////////////
   ///////////////////////////////////////
   public synchronized void write(DataOutput out) throws IOException {
   public synchronized void write(DataOutput out) throws IOException {
     jobid.write(out);
     jobid.write(out);
+    out.writeFloat(setupProgress);
     out.writeFloat(mapProgress);
     out.writeFloat(mapProgress);
     out.writeFloat(reduceProgress);
     out.writeFloat(reduceProgress);
     out.writeFloat(cleanupProgress);
     out.writeFloat(cleanupProgress);
@@ -244,6 +278,7 @@ public class JobStatus implements Writable {
 
 
   public synchronized void readFields(DataInput in) throws IOException {
   public synchronized void readFields(DataInput in) throws IOException {
     this.jobid = JobID.read(in);
     this.jobid = JobID.read(in);
+    this.setupProgress = in.readFloat();
     this.mapProgress = in.readFloat();
     this.mapProgress = in.readFloat();
     this.reduceProgress = in.readFloat();
     this.reduceProgress = in.readFloat();
     this.cleanupProgress = in.readFloat();
     this.cleanupProgress = in.readFloat();

+ 8 - 1
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -47,8 +47,10 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    *             and getAllJobs(queue) as a part of HADOOP-3930
    *             and getAllJobs(queue) as a part of HADOOP-3930
    * Version 14: Added setPriority for HADOOP-4124
    * Version 14: Added setPriority for HADOOP-4124
    * Version 15: Added KILLED status to JobStatus as part of HADOOP-3924            
    * Version 15: Added KILLED status to JobStatus as part of HADOOP-3924            
+   * Version 16: Added getSetupTaskReports and 
+   *             setupProgress to JobStatus as part of HADOOP-4261           
    */
    */
-  public static final long versionID = 15L;
+  public static final long versionID = 16L;
 
 
   /**
   /**
    * Allocate a name for the job.
    * Allocate a name for the job.
@@ -122,6 +124,11 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    */
    */
   public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException;
   public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException;
 
 
+  /**
+   * Grab a bunch of info on the setup tasks that make up the job
+   */
+  public TaskReport[] getSetupTaskReports(JobID jobid) throws IOException;
+
   /**
   /**
    * A MapReduce system always operates on a single filesystem.  This 
    * A MapReduce system always operates on a single filesystem.  This 
    * function returns the fs name.  ('local' if the localfs; 'addr:port' 
    * function returns the fs name.  ('local' if the localfs; 'addr:port' 

+ 47 - 5
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -531,7 +531,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       // is updated
       // is updated
       private void checkAndInit() throws IOException {
       private void checkAndInit() throws IOException {
         String jobStatus = this.job.get(Keys.JOB_STATUS);
         String jobStatus = this.job.get(Keys.JOB_STATUS);
-        if (Values.RUNNING.name().equals(jobStatus)) {
+        if (Values.PREP.name().equals(jobStatus)) {
           hasUpdates = true;
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           jip.initTasks();
           jip.initTasks();
@@ -1860,7 +1860,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       if (taskTrackerStatus == null) {
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
       } else {
-        List<Task> tasks = getCleanupTask(taskTrackerStatus);
+        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
         if (tasks == null ) {
         if (tasks == null ) {
           tasks = taskScheduler.assignTasks(taskTrackerStatus);
           tasks = taskScheduler.assignTasks(taskTrackerStatus);
         }
         }
@@ -2099,8 +2099,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return null;
     return null;
   }
   }
   
   
-  private synchronized List<Task> getCleanupTask(TaskTrackerStatus taskTracker)
-  throws IOException {
+  // returns cleanup tasks first, then setup tasks.
+  private synchronized List<Task> getSetupAndCleanupTasks(
+    TaskTrackerStatus taskTracker) throws IOException {
     int maxMapTasks = taskTracker.getMaxMapTasks();
     int maxMapTasks = taskTracker.getMaxMapTasks();
     int maxReduceTasks = taskTracker.getMaxReduceTasks();
     int maxReduceTasks = taskTracker.getMaxReduceTasks();
     int numMaps = taskTracker.countMapTasks();
     int numMaps = taskTracker.countMapTasks();
@@ -2120,6 +2121,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             return Collections.singletonList(t);
             return Collections.singletonList(t);
           }
           }
         }
         }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+                                  numUniqueHosts, true);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
       }
       }
       if (numReduces < maxReduceTasks) {
       if (numReduces < maxReduceTasks) {
         for (Iterator<JobInProgress> it = jobs.values().iterator();
         for (Iterator<JobInProgress> it = jobs.values().iterator();
@@ -2131,6 +2141,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             return Collections.singletonList(t);
             return Collections.singletonList(t);
           }
           }
         }
         }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+                                    numUniqueHosts, false);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
       }
       }
     }
     }
     return null;
     return null;
@@ -2353,6 +2372,28 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   
   
   }
   }
   
   
+  public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
+    JobInProgress job = jobs.get(jobid);
+    if (job == null) {
+      return new TaskReport[0];
+    } else {
+      Vector<TaskReport> reports = new Vector<TaskReport>();
+      Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
+      for (Iterator<TaskInProgress> it = completeTasks.iterator();
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      Vector<TaskInProgress> incompleteTasks = job.reportSetupTIPs(false);
+      for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); 
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      return reports.toArray(new TaskReport[reports.size()]);
+    }
+  }
+  
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
   
   
   /* 
   /* 
@@ -2576,7 +2617,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         // And completed maps with zero reducers of the job 
         // And completed maps with zero reducers of the job 
         // never need to be failed. 
         // never need to be failed. 
         if (!tip.isComplete() || 
         if (!tip.isComplete() || 
-            (tip.isMapTask() && job.desiredReduces() != 0)) {
+            (tip.isMapTask() && !tip.isSetupTask() && 
+             job.desiredReduces() != 0)) {
           // if the job is done, we don't want to change anything
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING) {
           if (job.getStatus().getRunState() == JobStatus.RUNNING) {
             job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), 
             job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), 

+ 5 - 0
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -114,6 +114,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
         }
         }
         JobContext jContext = new JobContext(conf);
         JobContext jContext = new JobContext(conf);
         OutputCommitter outputCommitter = job.getOutputCommitter();
         OutputCommitter outputCommitter = job.getOutputCommitter();
+        outputCommitter.setupJob(jContext);
+        status.setSetupProgress(1.0f);
         
         
         DataOutputBuffer buffer = new DataOutputBuffer();
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
         for (int i = 0; i < splits.length; i++) {
@@ -336,6 +338,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
   public TaskReport[] getCleanupTaskReports(JobID id) {
   public TaskReport[] getCleanupTaskReports(JobID id) {
     return new TaskReport[0];
     return new TaskReport[0];
   }
   }
+  public TaskReport[] getSetupTaskReports(JobID id) {
+    return new TaskReport[0];
+  }
 
 
   public JobStatus getJobStatus(JobID id) {
   public JobStatus getJobStatus(JobID id) {
     Job job = jobs.get(id);
     Job job = jobs.get(id);

+ 4 - 0
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -283,6 +283,10 @@ class MapTask extends Task {
       runCleanup(umbilical);
       runCleanup(umbilical);
       return;
       return;
     }
     }
+    if (setupJob) {
+      runSetupJob(umbilical);
+      return;
+    }
 
 
     int numReduceTasks = conf.getNumReduceTasks();
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
     LOG.info("numReduceTasks: " + numReduceTasks);

+ 1 - 5
src/mapred/org/apache/hadoop/mapred/OutputCommitter.java

@@ -30,12 +30,10 @@ import java.io.IOException;
  *   <li>
  *   <li>
  *   Setup the job during initialization. For example, create the temporary 
  *   Setup the job during initialization. For example, create the temporary 
  *   output directory for the job during the initialization of the job.
  *   output directory for the job during the initialization of the job.
- *   The job client does the setup for the job.
  *   </li>
  *   </li>
  *   <li>
  *   <li>
  *   Cleanup the job after the job completion. For example, remove the
  *   Cleanup the job after the job completion. For example, remove the
- *   temporary output directory after the job completion. CleanupJob is done
- *   by a separate task at the end of the job.
+ *   temporary output directory after the job completion. 
  *   </li>
  *   </li>
  *   <li>
  *   <li>
  *   Setup the task temporary output.
  *   Setup the task temporary output.
@@ -61,8 +59,6 @@ public abstract class OutputCommitter {
   /**
   /**
    * For the framework to setup the job output during initialization
    * For the framework to setup the job output during initialization
    * 
    * 
-   * The job client does the setup for the job.
-   *   
    * @param jobContext Context of the job whose output is being written.
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException if temporary output could not be created
    * @throws IOException if temporary output could not be created
    */
    */

+ 5 - 1
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -336,7 +336,7 @@ class ReduceTask extends Task {
     job.setBoolean("mapred.skip.on", isSkipping());
     job.setBoolean("mapred.skip.on", isSkipping());
     Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
     Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
 
 
-    if (!cleanupJob) {
+    if (!cleanupJob && !setupJob) {
       copyPhase = getProgress().addPhase("copy");
       copyPhase = getProgress().addPhase("copy");
       sortPhase  = getProgress().addPhase("sort");
       sortPhase  = getProgress().addPhase("sort");
       reducePhase = getProgress().addPhase("reduce");
       reducePhase = getProgress().addPhase("reduce");
@@ -351,6 +351,10 @@ class ReduceTask extends Task {
       runCleanup(umbilical);
       runCleanup(umbilical);
       return;
       return;
     }
     }
+    if (setupJob) {
+      runSetupJob(umbilical);
+      return;
+    }
     
     
     // Initialize the codec
     // Initialize the codec
     codec = initCodec();
     codec = initCodec();

+ 9 - 0
src/mapred/org/apache/hadoop/mapred/RunningJob.java

@@ -93,6 +93,15 @@ public interface RunningJob {
    */
    */
   public float cleanupProgress() throws IOException;
   public float cleanupProgress() throws IOException;
 
 
+  /**
+   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
+   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's setup-tasks.
+   * @throws IOException
+   */
+  public float setupProgress() throws IOException;
+
   /**
   /**
    * Check if the job is finished or not. 
    * Check if the job is finished or not. 
    * This is a non-blocking call.
    * This is a non-blocking call.

+ 14 - 0
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -108,6 +108,7 @@ abstract class Task implements Writable, Configurable {
   private int partition;                          // id within job
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
   TaskStatus taskStatus;                          // current status of the task
   protected boolean cleanupJob = false;
   protected boolean cleanupJob = false;
+  protected boolean setupJob = false;
   private Thread pingProgressThread;
   private Thread pingProgressThread;
   
   
   //skip ranges based on failed ranges from previous attempts
   //skip ranges based on failed ranges from previous attempts
@@ -241,6 +242,9 @@ abstract class Task implements Writable, Configurable {
     cleanupJob = true;
     cleanupJob = true;
   }
   }
 
 
+  public void setSetupTask() {
+    setupJob = true; 
+  }
   ////////////////////////////////////////////
   ////////////////////////////////////////////
   // Writable methods
   // Writable methods
   ////////////////////////////////////////////
   ////////////////////////////////////////////
@@ -253,6 +257,7 @@ abstract class Task implements Writable, Configurable {
     skipRanges.write(out);
     skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(skipping);
     out.writeBoolean(cleanupJob);
     out.writeBoolean(cleanupJob);
+    out.writeBoolean(setupJob);
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(writeSkipRecs);
   }
   }
   public void readFields(DataInput in) throws IOException {
   public void readFields(DataInput in) throws IOException {
@@ -266,6 +271,7 @@ abstract class Task implements Writable, Configurable {
     currentRecStartIndex = currentRecIndexIterator.next();
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     skipping = in.readBoolean();
     cleanupJob = in.readBoolean();
     cleanupJob = in.readBoolean();
+    setupJob = in.readBoolean();
     writeSkipRecs = in.readBoolean();
     writeSkipRecs = in.readBoolean();
   }
   }
 
 
@@ -718,6 +724,14 @@ abstract class Task implements Writable, Configurable {
     conf.getOutputCommitter().cleanupJob(jobContext);
     conf.getOutputCommitter().cleanupJob(jobContext);
     done(umbilical);
     done(umbilical);
   }
   }
+
+  protected void runSetupJob(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // do the setup
+    getProgress().setStatus("setup");
+    conf.getOutputCommitter().setupJob(jobContext);
+    done(umbilical);
+  }
   
   
   public void setConf(Configuration conf) {
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
     if (conf instanceof JobConf) {

+ 17 - 4
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -83,6 +83,7 @@ class TaskInProgress {
   private FailedRanges failedRanges = new FailedRanges();
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
   private volatile boolean skipping = false;
   private boolean cleanup = false; 
   private boolean cleanup = false; 
+  private boolean setup = false;
    
    
   // The 'next' usable taskid of this tip
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
   int nextTaskId = 0;
@@ -180,7 +181,15 @@ class TaskInProgress {
   public void setCleanupTask() {
   public void setCleanupTask() {
     cleanup = true;
     cleanup = true;
   }
   }
-  
+
+  public boolean isSetupTask() {
+    return setup;
+  }
+	  
+  public void setSetupTask() {
+    setup = true;
+  }
+
   public boolean isOnlyCommitPending() {
   public boolean isOnlyCommitPending() {
     for (TaskStatus t : taskStatuses.values()) {
     for (TaskStatus t : taskStatuses.values()) {
       if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
       if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
@@ -380,7 +389,8 @@ class TaskInProgress {
         (job.getStatus().getRunState() != JobStatus.RUNNING)) {
         (job.getStatus().getRunState() != JobStatus.RUNNING)) {
       tasksReportedClosed.add(taskid);
       tasksReportedClosed.add(taskid);
       close = true;
       close = true;
-    } else if (isComplete() && !(isMapTask() && isComplete(taskid)) &&
+    } else if (isComplete() && 
+               !(isMapTask() && !setup && !cleanup && isComplete(taskid)) &&
                !tasksReportedClosed.contains(taskid)) {
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       tasksReportedClosed.add(taskid);
       close = true; 
       close = true; 
@@ -565,7 +575,7 @@ class TaskInProgress {
     // should note this failure only for completed maps, only if this taskid;
     // should note this failure only for completed maps, only if this taskid;
     // completed this map. however if the job is done, there is no need to 
     // completed this map. however if the job is done, there is no need to 
     // manipulate completed maps
     // manipulate completed maps
-    if (this.isMapTask() && isComplete(taskid) && 
+    if (this.isMapTask() && !setup && !cleanup && isComplete(taskid) && 
         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
       this.completes--;
       this.completes--;
       
       
@@ -850,6 +860,9 @@ class TaskInProgress {
     if (cleanup) {
     if (cleanup) {
       t.setCleanupTask();
       t.setCleanupTask();
     }
     }
+    if (setup) {
+      t.setSetupTask();
+    }
     t.setConf(conf);
     t.setConf(conf);
     LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
     LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
     t.setSkipRanges(failedRanges.getSkipRanges());
     t.setSkipRanges(failedRanges.getSkipRanges());
@@ -951,7 +964,7 @@ class TaskInProgress {
   }
   }
 
 
   public long getMapInputSize() {
   public long getMapInputSize() {
-    if(isMapTask()) {
+    if(isMapTask() && !setup && !cleanup) {
       return rawSplit.getDataLength();
       return rawSplit.getDataLength();
     } else {
     } else {
       return 0;
       return 0;

+ 2 - 2
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -464,9 +464,9 @@ public class MiniMRCluster {
   }
   }
     
     
   /**
   /**
-   * Get the map task completion events
+   * Get the task completion events
    */
    */
-  public TaskCompletionEvent[] getMapTaskCompletionEvents(JobID id, int from, 
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from, 
                                                           int max) 
                                                           int max) 
   throws IOException {
   throws IOException {
     return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max);
     return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max);

+ 9 - 6
src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java

@@ -471,8 +471,9 @@ public class TestJobTrackerRestart extends TestCase {
     }
     }
     
     
     TaskCompletionEvent[] prevEvents = 
     TaskCompletionEvent[] prevEvents = 
-      mr.getMapTaskCompletionEvents(id, 0, numMaps);
-    TaskReport[] prevReports = jobClient.getMapTaskReports(id);
+      mr.getTaskCompletionEvents(id, 0, numMaps);
+    TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
+    TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
     ClusterStatus prevStatus = jobClient.getClusterStatus();
     ClusterStatus prevStatus = jobClient.getClusterStatus();
     
     
     mr.stopJobTracker();
     mr.stopJobTracker();
@@ -502,7 +503,7 @@ public class TestJobTrackerRestart extends TestCase {
     
     
     // Get the new jobtrackers events
     // Get the new jobtrackers events
     TaskCompletionEvent[] jtEvents =  
     TaskCompletionEvent[] jtEvents =  
-      mr.getMapTaskCompletionEvents(id, 0, 2 * numMaps);
+      mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
     
     
     // Test if all the events that were recovered match exactly
     // Test if all the events that were recovered match exactly
     testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
     testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
@@ -521,8 +522,10 @@ public class TestJobTrackerRestart extends TestCase {
     
     
     // Check the task reports
     // Check the task reports
     // The reports should match exactly if the attempts are same
     // The reports should match exactly if the attempts are same
-    TaskReport[] afterReports = jobClient.getMapTaskReports(id);
-    testTaskReports(prevReports, afterReports, numToMatch);
+    TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
+    TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
+    testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
+    testTaskReports(prevSetupReports, afterSetupReports, 1);
     
     
     //  Signal the reduce tasks
     //  Signal the reduce tasks
     signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
     signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
@@ -829,4 +832,4 @@ public class TestJobTrackerRestart extends TestCase {
   public static void main(String[] args) throws IOException {
   public static void main(String[] args) throws IOException {
     new TestJobTrackerRestart().testJobTrackerRestart();
     new TestJobTrackerRestart().testJobTrackerRestart();
   }
   }
-}
+}

+ 4 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

@@ -70,7 +70,10 @@ public class TestMiniMRLocalFS extends TestCase {
       // test the task report fetchers
       // test the task report fetchers
       JobClient client = new JobClient(job);
       JobClient client = new JobClient(job);
       JobID jobid = ret.job.getID();
       JobID jobid = ret.job.getID();
-      TaskReport[] reports = client.getMapTaskReports(jobid);
+      TaskReport[] reports;
+      reports = client.getSetupTaskReports(jobid);
+      assertEquals("number of setups", 2, reports.length);
+      reports = client.getMapTaskReports(jobid);
       assertEquals("number of maps", 1, reports.length);
       assertEquals("number of maps", 1, reports.length);
       reports = client.getReduceTaskReports(jobid);
       reports = client.getReduceTaskReports(jobid);
       assertEquals("number of reduces", 1, reports.length);
       assertEquals("number of reduces", 1, reports.length);

+ 6 - 3
src/webapps/job/jobdetails.jsp

@@ -88,15 +88,15 @@
               "</td></tr>\n");
               "</td></tr>\n");
   }
   }
 
 
-  private void printCleanupTaskSummary(JspWriter out,
+  private void printJobLevelTaskSummary(JspWriter out,
                                 String jobId,
                                 String jobId,
+                                String kind,
                                 TaskInProgress[] tasks
                                 TaskInProgress[] tasks
                                ) throws IOException {
                                ) throws IOException {
     int totalTasks = tasks.length;
     int totalTasks = tasks.length;
     int runningTasks = 0;
     int runningTasks = 0;
     int finishedTasks = 0;
     int finishedTasks = 0;
     int killedTasks = 0;
     int killedTasks = 0;
-    String kind = "cleanup";
     for(int i=0; i < totalTasks; ++i) {
     for(int i=0; i < totalTasks; ++i) {
       TaskInProgress task = tasks[i];
       TaskInProgress task = tasks[i];
       if (task.isComplete()) {
       if (task.isComplete()) {
@@ -208,6 +208,9 @@
     out.print("<b>Job Name:</b> " + profile.getJobName() + "<br>\n");
     out.print("<b>Job Name:</b> " + profile.getJobName() + "<br>\n");
     out.print("<b>Job File:</b> <a href=\"jobconf.jsp?jobid=" + jobId + "\">" 
     out.print("<b>Job File:</b> <a href=\"jobconf.jsp?jobid=" + jobId + "\">" 
               + profile.getJobFile() + "</a><br>\n");
               + profile.getJobFile() + "</a><br>\n");
+    out.print("<b>Job Setup:</b>");
+    printJobLevelTaskSummary(out, jobId, "setup", job.getSetupTasks());
+    out.print("<br>\n");
     if (runState == JobStatus.RUNNING) {
     if (runState == JobStatus.RUNNING) {
       out.print("<b>Status:</b> Running<br>\n");
       out.print("<b>Status:</b> Running<br>\n");
       out.print("<b>Started at:</b> " + new Date(job.getStartTime()) + "<br>\n");
       out.print("<b>Started at:</b> " + new Date(job.getStartTime()) + "<br>\n");
@@ -238,7 +241,7 @@
       }
       }
     }
     }
     out.print("<b>Job Cleanup:</b>");
     out.print("<b>Job Cleanup:</b>");
-    printCleanupTaskSummary(out, jobId, job.getCleanupTasks());
+    printJobLevelTaskSummary(out, jobId, "cleanup", job.getCleanupTasks());
     out.print("<br>\n");
     out.print("<br>\n");
     if (flakyTaskTrackers > 0) {
     if (flakyTaskTrackers > 0) {
       out.print("<b>Black-listed TaskTrackers:</b> " + 
       out.print("<b>Black-listed TaskTrackers:</b> " + 

+ 34 - 0
src/webapps/job/jobdetailshistory.jsp

@@ -42,6 +42,7 @@
     int totalMaps = 0 ; 
     int totalMaps = 0 ; 
     int totalReduces = 0;
     int totalReduces = 0;
     int totalCleanups = 0; 
     int totalCleanups = 0; 
+    int totalSetups = 0; 
     int numFailedMaps = 0; 
     int numFailedMaps = 0; 
     int numKilledMaps = 0;
     int numKilledMaps = 0;
     int numFailedReduces = 0 ; 
     int numFailedReduces = 0 ; 
@@ -49,6 +50,9 @@
     int numFinishedCleanups = 0;
     int numFinishedCleanups = 0;
     int numFailedCleanups = 0;
     int numFailedCleanups = 0;
     int numKilledCleanups = 0;
     int numKilledCleanups = 0;
+    int numFinishedSetups = 0;
+    int numFailedSetups = 0;
+    int numKilledSetups = 0;
 	
 	
     long mapStarted = 0 ; 
     long mapStarted = 0 ; 
     long mapFinished = 0 ; 
     long mapFinished = 0 ; 
@@ -56,6 +60,8 @@
     long reduceFinished = 0;
     long reduceFinished = 0;
     long cleanupStarted = 0;
     long cleanupStarted = 0;
     long cleanupFinished = 0; 
     long cleanupFinished = 0; 
+    long setupStarted = 0;
+    long setupFinished = 0; 
         
         
     Map <String,String> allHosts = new TreeMap<String,String>();
     Map <String,String> allHosts = new TreeMap<String,String>();
     for (JobHistory.Task task : tasks.values()) {
     for (JobHistory.Task task : tasks.values()) {
@@ -104,6 +110,21 @@
             numFailedCleanups++;
             numFailedCleanups++;
           } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
           } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
             numKilledCleanups++;
             numKilledCleanups++;
+          } 
+        } else if (Values.SETUP.name().equals(task.get(Keys.TASK_TYPE))) {
+          if (setupStarted==0||setupStarted > startTime) {
+            setupStarted = startTime ; 
+          }
+          if (setupFinished < finishTime) {
+            setupFinished = finishTime; 
+          }
+          totalSetups++; 
+          if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFinishedSetups++;
+          } else if (Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFailedSetups++;
+          } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numKilledSetups++;
           }
           }
         }
         }
       }
       }
@@ -117,6 +138,19 @@
 <td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
 <td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
 </tr>
 </tr>
 <tr>
 <tr>
+<td>Setup</td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=all">
+        <%=totalSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.SUCCESS %>">
+        <%=numFinishedSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.FAILED %>">
+        <%=numFailedSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.KILLED %>">
+        <%=numKilledSetups%></a></td>  
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, setupStarted, 0) %></td>
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, setupFinished, setupStarted) %></td>
+</tr>
+<tr>
 <td>Map</td>
 <td>Map</td>
     <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.MAP.name() %>&status=all">
     <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.MAP.name() %>&status=all">
         <%=totalMaps %></a></td>
         <%=totalMaps %></a></td>

+ 5 - 2
src/webapps/job/jobtasks.jsp

@@ -21,7 +21,7 @@
   }
   }
   String type = request.getParameter("type");
   String type = request.getParameter("type");
   String pagenum = request.getParameter("pagenum");
   String pagenum = request.getParameter("pagenum");
-  TaskInProgress[] tasks;
+  TaskInProgress[] tasks = null;
   String state = request.getParameter("state");
   String state = request.getParameter("state");
   state = (state!=null) ? state : "all";
   state = (state!=null) ? state : "all";
   int pnum = Integer.parseInt(pagenum);
   int pnum = Integer.parseInt(pagenum);
@@ -42,9 +42,12 @@
   else if ("reduce".equals(type)) {
   else if ("reduce".equals(type)) {
     reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
     reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
     tasks = (job != null) ? job.getReduceTasks() : null;
     tasks = (job != null) ? job.getReduceTasks() : null;
-  } else {
+  } else if ("cleanup".equals(type)) {
     reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
     reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
     tasks = (job != null) ? job.getCleanupTasks() : null;
     tasks = (job != null) ? job.getCleanupTasks() : null;
+  } else if ("setup".equals(type)) {
+    reports = (job != null) ? tracker.getSetupTaskReports(jobidObj) : null;
+    tasks = (job != null) ? job.getSetupTasks() : null;
   }
   }
 %>
 %>
 
 

+ 7 - 4
src/webapps/job/taskdetails.jsp

@@ -69,9 +69,12 @@
     }
     }
     TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
     TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
         : null;
         : null;
-    boolean isCleanup = false;
+    boolean isCleanupOrSetup = false;
     if (tipidObj != null) { 
     if (tipidObj != null) { 
-      isCleanup = job.getTaskInProgress(tipidObj).isCleanupTask();
+      isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+      if (!isCleanupOrSetup) {
+        isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+      }
     }
     }
 %>
 %>
 
 
@@ -98,7 +101,7 @@
 <table border=2 cellpadding="5" cellspacing="2">
 <table border=2 cellpadding="5" cellspacing="2">
 <tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td> 
 <tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td> 
   <%
   <%
-   if (!ts[0].getIsMap() && !isCleanup) {
+   if (!ts[0].getIsMap() && !isCleanupOrSetup) {
    %>
    %>
 <td>Shuffle Finished</td><td>Sort Finished</td>
 <td>Shuffle Finished</td><td>Sort Finished</td>
   <%
   <%
@@ -126,7 +129,7 @@
         out.print("<td>"
         out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getStartTime(), 0) + "</td>");
           .getStartTime(), 0) + "</td>");
-        if (!ts[i].getIsMap() && !isCleanup) {
+        if (!ts[i].getIsMap() && !isCleanupOrSetup) {
           out.print("<td>"
           out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getShuffleFinishTime(), status.getStartTime()) + "</td>");
           .getShuffleFinishTime(), status.getStartTime()) + "</td>");

Some files were not shown because too many files changed in this diff