瀏覽代碼

HADOOP-4261. Adds a setup task for jobs. This is required so that we don't setup jobs that haven't been inited yet (since init could lead to job failure). Only after the init has successfully happened do we launch the setupJob task. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@702360 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 年之前
父節點
當前提交
0ffc659dfb
共有 27 個文件被更改,包括 576 次插入187 次删除
  1. 5 0
      CHANGES.txt
  2. 6 2
      docs/changes.html
  3. 2 4
      docs/mapred_tutorial.html
  4. 1 1
      docs/mapred_tutorial.pdf
  5. 2 4
      src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
  6. 1 1
      src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
  7. 35 0
      src/mapred/org/apache/hadoop/mapred/HistoryViewer.java
  8. 21 8
      src/mapred/org/apache/hadoop/mapred/JobClient.java
  9. 83 54
      src/mapred/org/apache/hadoop/mapred/JobHistory.java
  10. 208 79
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  11. 35 0
      src/mapred/org/apache/hadoop/mapred/JobStatus.java
  12. 8 1
      src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
  13. 47 5
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  14. 5 0
      src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
  15. 4 0
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  16. 1 5
      src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
  17. 5 1
      src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  18. 9 0
      src/mapred/org/apache/hadoop/mapred/RunningJob.java
  19. 14 0
      src/mapred/org/apache/hadoop/mapred/Task.java
  20. 17 4
      src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  21. 2 2
      src/test/org/apache/hadoop/mapred/MiniMRCluster.java
  22. 9 6
      src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
  23. 4 1
      src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
  24. 6 3
      src/webapps/job/jobdetails.jsp
  25. 34 0
      src/webapps/job/jobdetailshistory.jsp
  26. 5 2
      src/webapps/job/jobtasks.jsp
  27. 7 4
      src/webapps/job/taskdetails.jsp

+ 5 - 0
CHANGES.txt

@@ -859,6 +859,11 @@ Release 0.19.0 - Unreleased
     HADOOP-4163. Report FSErrors from map output fetch threads instead of
     merely logging them. (Sharad Agarwal via cdouglas)
 
+    HADOOP-4261. Adds a setup task for jobs. This is required so that we 
+    don't setup jobs that haven't been inited yet (since init could lead
+    to job failure). Only after the init has successfully happened do we 
+    launch the setupJob task. (Amareshwari Sriramadasu via ddas)
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

+ 6 - 2
docs/changes.html

@@ -259,7 +259,7 @@ changes from the prior release.<br />(cutting)</li>
     </ol>
   </li>
   <li><a href="javascript:toggleList('release_0.19.0_-_unreleased_._improvements_')">  IMPROVEMENTS
-</a>&nbsp;&nbsp;&nbsp;(70)
+</a>&nbsp;&nbsp;&nbsp;(72)
     <ol id="release_0.19.0_-_unreleased_._improvements_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-4205">HADOOP-4205</a>. hive: metastore and ql to use the refactored SerDe library.<br />(zshao)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-4106">HADOOP-4106</a>. libhdfs: add time, permission and user attribute support
@@ -387,6 +387,8 @@ better places.<br />(Sanjay Radia via rangadi)</li>
 make itermitant failures reproducible.<br />(szetszwo via cdouglas)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-4209">HADOOP-4209</a>. Remove the change to the format of task attempt id by
 incrementing the task attempt numbers by 1000 when the job restarts.<br />(Amar Kamat via omalley)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4301">HADOOP-4301</a>. Adds forrest doc for the skip bad records feature.<br />(Sharad Agarwal via ddas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4354">HADOOP-4354</a>. Separate TestDatanodeDeath.testDatanodeDeath() into 4 tests.<br />(szetszwo)</li>
     </ol>
   </li>
   <li><a href="javascript:toggleList('release_0.19.0_-_unreleased_._optimizations_')">  OPTIMIZATIONS
@@ -413,7 +415,7 @@ it from a different .crc file.<br />(Jothi Padmanabhan via ddas)</li>
     </ol>
   </li>
   <li><a href="javascript:toggleList('release_0.19.0_-_unreleased_._bug_fixes_')">  BUG FIXES
-</a>&nbsp;&nbsp;&nbsp;(108)
+</a>&nbsp;&nbsp;&nbsp;(109)
     <ol id="release_0.19.0_-_unreleased_._bug_fixes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3563">HADOOP-3563</a>.  Refactor the distributed upgrade code so that it is
 easier to identify datanode and namenode related code.<br />(dhruba)</li>
@@ -621,6 +623,8 @@ told to read unlesss end-of-file is reached.<br />(Pete Wyckoff via dhruba)</li>
 retries for fetching map-outputs; also fixed the case where the reducer
 automatically kills on too many unique map-outputs could not be fetched
 for small jobs.<br />(Amareshwari Sri Ramadasu via acmurthy)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4163">HADOOP-4163</a>. Report FSErrors from map output fetch threads instead of
+merely logging them.<br />(Sharad Agarwal via cdouglas)</li>
     </ol>
   </li>
 </ul>

+ 2 - 4
docs/mapred_tutorial.html

@@ -2186,14 +2186,12 @@ document.write("Last Published: " + document.lastModified);
 <li>
             Setup the job during initialization. For example, create
             the temporary output directory for the job during the
-            initialization of the job. The job client does the setup
-            for the job.
+            initialization of the job. 
           </li>
           
 <li>
             Cleanup the job after the job completion. For example, remove the
-            temporary output directory after the job completion. A separate 
-            task does the cleanupJob at the end of the job.
+            temporary output directory after the job completion.
           </li>
           
 <li>

文件差異過大導致無法顯示
+ 1 - 1
docs/mapred_tutorial.pdf


+ 2 - 4
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1594,13 +1594,11 @@
           <li>
             Setup the job during initialization. For example, create
             the temporary output directory for the job during the
-            initialization of the job. The job client does the setup
-            for the job.
+            initialization of the job. 
           </li>
           <li>
             Cleanup the job after the job completion. For example, remove the
-            temporary output directory after the job completion. A separate 
-            task does the cleanupJob at the end of the job.
+            temporary output directory after the job completion.
           </li>
           <li>
             Setup the task temporary output.

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java

@@ -58,7 +58,7 @@ class EagerTaskInitializationListener extends JobInProgressListener {
           LOG.error("Job initialization failed:\n" +
                     StringUtils.stringifyException(t));
           if (job != null) {
-            job.fail();
+            job.terminateJob(JobStatus.FAILED);
           }
         }
       }

+ 35 - 0
src/mapred/org/apache/hadoop/mapred/HistoryViewer.java

@@ -93,6 +93,8 @@ class HistoryViewer {
     printJobDetails();
     printTaskSummary();
     printJobAnalysis();
+    printTasks("SETUP", "FAILED");
+    printTasks("SETUP", "KILLED");
     printTasks("MAP", "FAILED");
     printTasks("MAP", "KILLED");
     printTasks("REDUCE", "FAILED");
@@ -100,9 +102,11 @@ class HistoryViewer {
     printTasks("CLEANUP", "FAILED");
     printTasks("CLEANUP", "KILLED");
     if (printAll) {
+      printTasks("SETUP", "SUCCESS");
       printTasks("MAP", "SUCCESS");
       printTasks("REDUCE", "SUCCESS");
       printTasks("CLEANUP", "SUCCESS");
+      printAllTaskAttempts("SETUP");
       printAllTaskAttempts("MAP");
       printAllTaskAttempts("REDUCE");
       printAllTaskAttempts("CLEANUP");
@@ -219,6 +223,7 @@ class HistoryViewer {
     int totalMaps = 0; 
     int totalReduces = 0; 
     int totalCleanups = 0;
+    int totalSetups = 0;
     int numFailedMaps = 0; 
     int numKilledMaps = 0;
     int numFailedReduces = 0; 
@@ -226,12 +231,17 @@ class HistoryViewer {
     int numFinishedCleanups = 0;
     int numFailedCleanups = 0;
     int numKilledCleanups = 0;
+    int numFinishedSetups = 0;
+    int numFailedSetups = 0;
+    int numKilledSetups = 0;
     long mapStarted = 0; 
     long mapFinished = 0; 
     long reduceStarted = 0; 
     long reduceFinished = 0; 
     long cleanupStarted = 0;
     long cleanupFinished = 0;
+    long setupStarted = 0;
+    long setupFinished = 0;
 
     Map <String, String> allHosts = new TreeMap<String, String>();
 
@@ -286,6 +296,23 @@ class HistoryViewer {
                                             attempt.get(Keys.TASK_STATUS))) {
             numKilledCleanups++;
           }
+        } else if (Values.SETUP.name().equals(task.get(Keys.TASK_TYPE))){
+          if (setupStarted==0||setupStarted > startTime) {
+            setupStarted = startTime; 
+          }
+          if (setupFinished < finishTime) {
+            setupFinished = finishTime; 
+          }
+          totalSetups++; 
+          if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFinishedSetups++;
+          } else if (Values.FAILED.name().equals(
+                                            attempt.get(Keys.TASK_STATUS))) {
+            numFailedSetups++;
+          } else if (Values.KILLED.name().equals(
+                                            attempt.get(Keys.TASK_STATUS))) {
+            numKilledSetups++;
+          }
         }
       }
     }
@@ -296,6 +323,14 @@ class HistoryViewer {
     taskSummary.append("\nKind\tTotal\t");
     taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
     taskSummary.append("\n");
+    taskSummary.append("\nSetup\t").append(totalSetups);
+    taskSummary.append("\t").append(numFinishedSetups);
+    taskSummary.append("\t\t").append(numFailedSetups);
+    taskSummary.append("\t").append(numKilledSetups);
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, setupStarted, 0));
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, setupFinished, setupStarted)); 
     taskSummary.append("\nMap\t").append(totalMaps);
     taskSummary.append("\t").append(job.getInt(Keys.FINISHED_MAPS));
     taskSummary.append("\t\t").append(numFailedMaps);

+ 21 - 8
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -261,6 +261,15 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       return status.cleanupProgress();
     }
 
+    /**
+     * A float between 0.0 and 1.0, indicating the % of setup work
+     * completed.
+     */
+    public float setupProgress() throws IOException {
+      ensureFreshStatus();
+      return status.setupProgress();
+    }
+
     /**
      * Returns immediately whether the whole job is done yet or not.
      */
@@ -813,13 +822,6 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       out.close();
     }
 
-    // skip doing setup if there are no maps for the job.
-    // because if there are no maps, job is considered completed and successful
-    if (splits.length != 0) {
-      // do setupJob
-      job.getOutputCommitter().setupJob(new JobContext(job));
-    }
-
     //
     // Now, actually submit the job (using the submit name)
     //
@@ -1039,7 +1041,18 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
     return jobSubmitClient.getCleanupTaskReports(jobId);
   }
-  
+
+  /**
+   * Get the information of the current state of the setup tasks of a job.
+   * 
+   * @param jobId the job to query.
+   * @return the list of all of the setup tips.
+   * @throws IOException
+   */    
+  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
+    return jobSubmitClient.getSetupTaskReports(jobId);
+  }
+
   /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
   @Deprecated
   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {

+ 83 - 54
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -123,7 +123,7 @@ public class JobHistory {
    * most places in history file. 
    */
   public static enum Values {
-    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING
+    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
   }
 
   // temp buffer for parsed dataa
@@ -923,12 +923,14 @@ public class JobHistory {
     }
     /**
      * Logs launch time of job. 
+     * 
      * @param jobId job id, assigned by jobtracker. 
      * @param startTime start time of job. 
      * @param totalMaps total maps assigned by jobtracker. 
      * @param totalReduces total reduces. 
      */
-    public static void logStarted(JobID jobId, long startTime, int totalMaps, int totalReduces){
+    public static void logInited(JobID jobId, long startTime, 
+                                 int totalMaps, int totalReduces) {
       if (!disableHistory){
         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
@@ -940,10 +942,45 @@ public class JobHistory {
               new String[] {jobId.toString(), String.valueOf(startTime), 
                             String.valueOf(totalMaps), 
                             String.valueOf(totalReduces), 
+                            Values.PREP.name()}); 
+        }
+      }
+    }
+    
+   /**
+     * Logs the job as RUNNING. 
+     *
+     * @param jobId job id, assigned by jobtracker. 
+     * @param startTime start time of job. 
+     * @param totalMaps total maps assigned by jobtracker. 
+     * @param totalReduces total reduces. 
+     * @deprecated Use {@link #logInited(JobID, long, int, int)} and 
+     * {@link #logStarted(JobID)}
+     */
+    @Deprecated
+    public static void logStarted(JobID jobId, long startTime, 
+                                  int totalMaps, int totalReduces) {
+      logStarted(jobId);
+    }
+    
+    /**
+     * Logs job as running 
+     * @param jobId job id, assigned by jobtracker. 
+     */
+    public static void logStarted(JobID jobId){
+      if (!disableHistory){
+        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
+        if (null != writer){
+          JobHistory.log(writer, RecordTypes.Job, 
+              new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
+              new String[] {jobId.toString(),  
                             Values.RUNNING.name()}); 
         }
       }
     }
+    
     /**
      * Log job finished. closes the job file in history. 
      * @param jobId job id, assigned by jobtracker. 
@@ -1197,12 +1234,11 @@ public class JobHistory {
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param hostName host name of the task attempt. 
      * @deprecated Use 
-     *             {@link #logStarted(TaskAttemptID, long, String, int, 
-     *                                boolean)}
+     *             {@link #logStarted(TaskAttemptID, long, String, int, String)}
      */
     @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
-      logStarted(taskAttemptId, startTime, hostName, -1, false);
+      logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
     }
     
     /**
@@ -1212,11 +1248,11 @@ public class JobHistory {
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param trackerName name of the tracker executing the task attempt.
      * @param httpPort http port of the task tracker executing the task attempt
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
                                   String trackerName, int httpPort, 
-                                  boolean isCleanup){
+                                  String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1226,8 +1262,7 @@ public class JobHistory {
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
                                      Keys.TRACKER_NAME, Keys.HTTP_PORT},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.MAP.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
@@ -1242,13 +1277,12 @@ public class JobHistory {
      * @param finishTime finish time
      * @param hostName host name 
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, String, boolean, String, 
-     *                     Counters)}
+     * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
      */
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
                                    String hostName){
-      logFinished(taskAttemptId, finishTime, hostName, false, "", 
+      logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "", 
                   new Counters());
     }
 
@@ -1258,14 +1292,15 @@ public class JobHistory {
      * @param taskAttemptId task attempt id 
      * @param finishTime finish time
      * @param hostName host name 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      * @param stateString state string of the task attempt
      * @param counter counters of the task attempt
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
                                    long finishTime, 
                                    String hostName,
-                                   boolean isCleanup, String stateString, 
+                                   String taskType,
+                                   String stateString, 
                                    Counters counter) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
@@ -1277,8 +1312,7 @@ public class JobHistory {
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.STATE_STRING, Keys.COUNTERS},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.MAP.name(), 
+                         new String[]{taskType, 
                                       taskAttemptId.getTaskID().toString(),
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(),  
@@ -1296,13 +1330,13 @@ public class JobHistory {
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt.
      * @deprecated Use
-     * {@link #logFailed(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logFailed(TaskAttemptID, long, String, String, String)} 
      */
     @Deprecated
     public static void logFailed(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, 
                                  String error) {
-      logFailed(taskAttemptId, timestamp, hostName, error, false);
+      logFailed(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
     }
 
     /**
@@ -1312,11 +1346,11 @@ public class JobHistory {
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
     public static void logFailed(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, 
-                                 String error, boolean isCleanup) {
+                                 String error, String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1326,8 +1360,7 @@ public class JobHistory {
                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 
                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
-                         new String[]{ isCleanup ? Values.CLEANUP.name() :
-                                                   Values.MAP.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(),
                                        taskAttemptId.toString(), 
                                        Values.FAILED.name(),
@@ -1344,12 +1377,12 @@ public class JobHistory {
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
      * @deprecated Use 
-     * {@link #logKilled(TaskAttemptID, long, String, String, boolean)}
+     * {@link #logKilled(TaskAttemptID, long, String, String, String)}
      */
     @Deprecated
     public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, String error){
-      logKilled(taskAttemptId, timestamp, hostName, error, false);
+      logKilled(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
     } 
     
     /**
@@ -1359,11 +1392,11 @@ public class JobHistory {
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
     public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName,
-                                 String error, boolean isCleanup){
+                                 String error, String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1374,8 +1407,7 @@ public class JobHistory {
                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME,
                                     Keys.ERROR},
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.MAP.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(),
                                        Values.KILLED.name(),
@@ -1396,12 +1428,12 @@ public class JobHistory {
      * @param startTime start time
      * @param hostName host name 
      * @deprecated Use 
-     * {@link #logStarted(TaskAttemptID, long, String, int, boolean)}
+     * {@link #logStarted(TaskAttemptID, long, String, int, String)}
      */
     @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String hostName){
-      logStarted(taskAttemptId, startTime, hostName, -1, false);
+      logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
     }
     
     /**
@@ -1411,11 +1443,12 @@ public class JobHistory {
      * @param startTime start time
      * @param trackerName tracker name 
      * @param httpPort the http port of the tracker executing the task attempt
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      */
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String trackerName, 
-                                  int httpPort, boolean isCleanup) {
+                                  int httpPort, 
+                                  String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1425,8 +1458,7 @@ public class JobHistory {
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
                                       Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
                                       Keys.TRACKER_NAME, Keys.HTTP_PORT},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.REDUCE.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
@@ -1443,15 +1475,15 @@ public class JobHistory {
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, long, long, String, boolean, 
-     *                     String, Counters)}
+     * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)}
      */
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
                                    long sortFinished, long finishTime, 
                                    String hostName){
       logFinished(taskAttemptId, shuffleFinished, sortFinished, 
-                  finishTime, hostName, false, "", new Counters());
+                  finishTime, hostName, Values.REDUCE.name(),
+                  "", new Counters());
     }
     
     /**
@@ -1462,14 +1494,14 @@ public class JobHistory {
      * @param sortFinished sort finish time
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      * @param stateString the state string of the attempt
      * @param counter counters of the attempt
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
                                    long shuffleFinished, 
                                    long sortFinished, long finishTime, 
-                                   String hostName, boolean isCleanup,
+                                   String hostName, String taskType,
                                    String stateString, Counters counter) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
@@ -1482,8 +1514,7 @@ public class JobHistory {
                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.STATE_STRING, Keys.COUNTERS},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.REDUCE.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(), 
@@ -1503,12 +1534,12 @@ public class JobHistory {
      * @param hostName host name of the task attempt.  
      * @param error error message of the task.
      * @deprecated Use 
-     * {@link #logFailed(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logFailed(TaskAttemptID, long, String, String, String)} 
      */
     @Deprecated
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error){
-      logFailed(taskAttemptId, timestamp, hostName, error, false);
+      logFailed(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
     }
     
     /**
@@ -1518,11 +1549,11 @@ public class JobHistory {
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      */
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error, 
-                                 boolean isCleanup) {
+                                 String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1533,8 +1564,7 @@ public class JobHistory {
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME,
                                       Keys.ERROR },
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.REDUCE.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(), 
                                        Values.FAILED.name(), 
@@ -1550,12 +1580,12 @@ public class JobHistory {
      * @param hostName host name of the task attempt.  
      * @param error error message of the task.
      * @deprecated Use 
-     * {@link #logKilled(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logKilled(TaskAttemptID, long, String, String, String)} 
      */
     @Deprecated
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error) {
-      logKilled(taskAttemptId, timestamp, hostName, error, false);
+      logKilled(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
     }
     
     /**
@@ -1565,11 +1595,11 @@ public class JobHistory {
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
-     * @param isCleanup Whether the attempt is cleanup or not 
-     */
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
+    */
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error, 
-                                 boolean isCleanup) {
+                                 String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1580,8 +1610,7 @@ public class JobHistory {
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME, 
                                       Keys.ERROR },
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.REDUCE.name(),
+                         new String[]{ taskType,
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(), 
                                        Values.KILLED.name(), 

+ 208 - 79
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -63,6 +63,7 @@ class JobInProgress {
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
   TaskInProgress cleanup[] = new TaskInProgress[0];
+  TaskInProgress setup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
   
@@ -83,6 +84,7 @@ class JobInProgress {
   int failedMapTIPs = 0;
   int failedReduceTIPs = 0;
   private volatile boolean launchedCleanup = false;
+  private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
 
@@ -382,12 +384,13 @@ class JobInProgress {
       // Finished time need to be setted here to prevent this job to be retired
       // from the job tracker jobs at the next retire iteration.
       this.finishTime = this.launchTime;
+      status.setSetupProgress(1.0f);
       status.setMapProgress(1.0f);
       status.setReduceProgress(1.0f);
       status.setCleanupProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
       tasksInited.set(true);
-      JobHistory.JobInfo.logStarted(profile.getJobID(), 
+      JobHistory.JobInfo.logInited(profile.getJobID(), 
                                     this.launchTime, 0, 0);
       JobHistory.JobInfo.logFinished(profile.getJobID(), 
                                      this.finishTime, 0, 0, 0, 0,
@@ -422,12 +425,22 @@ class JobInProgress {
                        numReduceTasks, jobtracker, conf, this);
     cleanup[1].setCleanupTask();
 
-    this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, 0.0f, 
-                                          JobStatus.RUNNING, status.getJobPriority());
+    // create two setup tips, one map and one reduce.
+    setup = new TaskInProgress[2];
+    // setup map tip. This map is doesn't use split. 
+    // Just assign splits[0]
+    setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+            jobtracker, conf, this, numMapTasks + 1 );
+    setup[0].setSetupTask();
+
+    // setup reduce tip.
+    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                       numReduceTasks + 1, jobtracker, conf, this);
+    setup[1].setSetupTask();
+    
     tasksInited.set(true);
-        
-    JobHistory.JobInfo.logStarted(profile.getJobID(), this.launchTime, 
-                                  numMapTasks, numReduceTasks);
+    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
+                                 numMapTasks, numReduceTasks);
   }
 
   /////////////////////////////////////////////////////
@@ -532,6 +545,14 @@ class JobInProgress {
     return cleanup;
   }
   
+  /**
+   * Get the list of setup tasks
+   * @return the array of setup tasks for the job
+   */
+  TaskInProgress[] getSetupTasks() {
+    return setup;
+  }
+  
   /**
    * Get the list of reduce tasks
    * @return the raw array of reduce tasks for this job
@@ -611,6 +632,21 @@ class JobInProgress {
     return results;
   }
 
+  /**
+   * Return a vector of setup TaskInProgress objects
+   */
+  public synchronized Vector<TaskInProgress> reportSetupTIPs(
+                                               boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+    for (int i = 0; i < setup.length; i++) {
+      if (setup[i].isComplete() == shouldBeComplete) {
+        results.add(setup[i]);
+      }
+    }
+    return results;
+  }
+
   ////////////////////////////////////////////////////
   // Status update methods
   ////////////////////////////////////////////////////
@@ -655,7 +691,9 @@ class JobInProgress {
                                             taskCompletionEventTracker, 
                                             taskid,
                                             tip.idWithinJob(),
-                                            status.getIsMap(),
+                                            status.getIsMap() &&
+                                            !tip.isCleanupTask() &&
+                                            !tip.isSetupTask(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
@@ -698,7 +736,9 @@ class JobInProgress {
         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
                                             taskid,
                                             tip.idWithinJob(),
-                                            status.getIsMap(),
+                                            status.getIsMap() &&
+                                            !tip.isCleanupTask() &&
+                                            !tip.isSetupTask(),
                                             taskCompletionStatus, 
                                             httpTaskLogLocation
                                            );
@@ -727,7 +767,7 @@ class JobInProgress {
                  oldProgress + " to " + tip.getProgress());
     }
     
-    if (!tip.isCleanupTask()) {
+    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
         if (maps.length == 0) {
@@ -860,11 +900,7 @@ class JobInProgress {
     // Now launch the cleanupTask
     Task result = tip.getTaskToRun(tts.getTrackerName());
     if (result != null) {
-      launchedCleanup = true;
-      if (tip.isFirstAttempt(result.getTaskID())) {
-        JobHistory.Task.logStarted(tip.getTIPId(), 
-          Values.CLEANUP.name(), System.currentTimeMillis(), "");
-      }
+      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
     }
     return result;
   }
@@ -878,8 +914,12 @@ class JobInProgress {
    * @return true/false
    */
   private synchronized boolean canLaunchCleanupTask() {
+    if (!tasksInited.get()) {
+      return false;
+    }
     // check if the job is running
-    if (status.getRunState() != JobStatus.RUNNING) {
+    if (status.getRunState() != JobStatus.RUNNING &&
+        status.getRunState() != JobStatus.PREP) {
       return false;
     }
     // check if cleanup task has been launched already. 
@@ -899,7 +939,62 @@ class JobInProgress {
     }
     return launchCleanupTask;
   }
+
+  /**
+   * Return a SetupTask, if appropriate, to run on the given tasktracker
+   * 
+   */
+  public synchronized Task obtainSetupTask(TaskTrackerStatus tts, 
+                                             int clusterSize, 
+                                             int numUniqueHosts,
+                                             boolean isMapSlot
+                                            ) throws IOException {
+    if (!canLaunchSetupTask()) {
+      return null;
+    }
+    
+    String taskTracker = tts.getTrackerName();
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return null;
+    }
+    
+    List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
+    if (isMapSlot) {
+      setupTaskList.add(setup[0]);
+    } else {
+      setupTaskList.add(setup[1]);
+    }
+    TaskInProgress tip = findTaskFromList(setupTaskList,
+                           tts, numUniqueHosts, false);
+    if (tip == null) {
+      return null;
+    }
+    
+    // Now launch the setupTask
+    Task result = tip.getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+    }
+    return result;
+  }
+  
+  /**
+   * Check whether setup task can be launched for the job.
+   * 
+   * Setup task can be launched after the tasks are inited
+   * and Job is in PREP state
+   * and if it is not already launched
+   * or job is not Killed/Failed
+   * @return true/false
+   */
+  private synchronized boolean canLaunchSetupTask() {
+    return (tasksInited.get() && status.getRunState() == JobStatus.PREP && 
+           !launchedSetup && !jobKilled && !jobFailed);
+  }
   
+
   /**
    * Return a ReduceTask, if appropriate, to run on the given tasktracker.
    * We don't have cache-sensitivity for reduce tasks, as they
@@ -955,8 +1050,14 @@ class JobInProgress {
                                         boolean isScheduled) {
     // keeping the earlier ordering intact
     String name;
-    Enum counter;
-    if (tip.isMapTask()) {
+    Enum counter = null;
+    if (tip.isSetupTask()) {
+      launchedSetup = true;
+      name = Values.SETUP.name();
+    } else if (tip.isCleanupTask()) {
+      launchedCleanup = true;
+      name = Values.CLEANUP.name();
+    } else if (tip.isMapTask()) {
       ++runningMapTasks;
       name = Values.MAP.name();
       counter = Counter.TOTAL_LAUNCHED_MAPS;
@@ -975,7 +1076,9 @@ class JobInProgress {
       JobHistory.Task.logStarted(tip.getTIPId(), name,
                                  tip.getExecStartTime(), "");
     }
-    jobCounters.incrCounter(counter, 1);
+    if (!tip.isSetupTask() && !tip.isCleanupTask()) {
+      jobCounters.incrCounter(counter, 1);
+    }
     
     // Make an entry in the tip if the attempt is not scheduled i.e externally
     // added
@@ -994,7 +1097,7 @@ class JobInProgress {
     //
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
-    if (tip.isMapTask()) {
+    if (tip.isMapTask() && !tip.isSetupTask() && !tip.isCleanupTask()) {
       // increment the data locality counter for maps
       Node tracker = jobtracker.getNode(tts.getHost());
       int level = this.maxLevel;
@@ -1648,65 +1751,45 @@ class JobInProgress {
     TaskTrackerStatus ttStatus = 
       this.jobtracker.getTaskTracker(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
+    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isSetupTask() ? Values.SETUP.name() :
+                      tip.isMapTask() ? Values.MAP.name() : 
+                      Values.REDUCE.name();
     if (status.getIsMap()){
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                                        status.getTaskTracker(), 
                                        ttStatus.getHttpPort(), 
-                                       tip.isCleanupTask()); 
+                                       taskType); 
       JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
-                                        trackerHostname, tip.isCleanupTask(),
+                                        trackerHostname, taskType,
                                         status.getStateString(), 
                                         status.getCounters()); 
-      JobHistory.Task.logFinished(tip.getTIPId(), 
-                                  tip.isCleanupTask() ? Values.CLEANUP.name() :
-                                  Values.MAP.name(), 
-                                  tip.getExecFinishTime(),
-                                  status.getCounters()); 
     }else{
       JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
                                           status.getTaskTracker(),
                                           ttStatus.getHttpPort(), 
-                                          tip.isCleanupTask()); 
+                                          taskType); 
       JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
                                            status.getSortFinishTime(), status.getFinishTime(), 
-                                           trackerHostname, tip.isCleanupTask(), 
+                                           trackerHostname, 
+                                           taskType,
                                            status.getStateString(), 
                                            status.getCounters()); 
-      JobHistory.Task.logFinished(tip.getTIPId(), 
-                                  tip.isCleanupTask() ? Values.CLEANUP.name() :
-                                  Values.REDUCE.name(), tip.getExecFinishTime(),
-                                  status.getCounters()); 
     }
+    JobHistory.Task.logFinished(tip.getTIPId(), 
+                                taskType,
+                                tip.getExecFinishTime(),
+                                status.getCounters()); 
         
     int newNumAttempts = tip.getActiveTasks().size();
-    if (!tip.isCleanupTask()) {
-      if (tip.isMapTask()) {
-        runningMapTasks -= 1;
-        // check if this was a sepculative task
-        if (oldNumAttempts > 1) {
-          speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
-        }
-        finishedMapTasks += 1;
-        metrics.completeMap(taskid);
-        // remove the completed map from the resp running caches
-        retireMap(tip);
-        if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
-          this.status.setMapProgress(1.0f);
-        }
-      } else {
-        runningReduceTasks -= 1;
-        if (oldNumAttempts > 1) {
-          speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
-        }
-        finishedReduceTasks += 1;
-        metrics.completeReduce(taskid);
-        // remove the completed reduces from the running reducers set
-        retireReduce(tip);
-        if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
-          this.status.setReduceProgress(1.0f);
-        }
-      }
-    } else {
+    if (tip.isSetupTask()) {
+      // setup task has finished. kill the extra setup tip
+      killSetupTip(!tip.isMapTask());
+      // Job can start running now.
+      this.status.setSetupProgress(1.0f);
+      this.status.setRunState(JobStatus.RUNNING);
+      JobHistory.JobInfo.logStarted(profile.getJobID());
+    } else if (tip.isCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
         // kill the reduce tip
@@ -1730,7 +1813,31 @@ class JobInProgress {
       // The job has been killed/failed/successful
       // JobTracker should cleanup this task
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
-      return false;
+    } else if (tip.isMapTask()) {
+      runningMapTasks -= 1;
+      // check if this was a sepculative task
+      if (oldNumAttempts > 1) {
+        speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
+      }
+      finishedMapTasks += 1;
+      metrics.completeMap(taskid);
+      // remove the completed map from the resp running caches
+      retireMap(tip);
+      if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
+        this.status.setMapProgress(1.0f);
+      }
+    } else {
+      runningReduceTasks -= 1;
+      if (oldNumAttempts > 1) {
+        speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
+      }
+      finishedReduceTasks += 1;
+      metrics.completeReduce(taskid);
+      // remove the completed reduces from the running reducers set
+      retireReduce(tip);
+      if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
+        this.status.setReduceProgress(1.0f);
+      }
     }
     
     return true;
@@ -1764,7 +1871,7 @@ class JobInProgress {
     }
   }
   
-  private synchronized void terminateJob(int jobTerminationState) {
+  synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
       if (jobTerminationState == JobStatus.FAILED) {
@@ -1859,6 +1966,8 @@ class JobInProgress {
     if (wasRunning && !isRunning) {
       if (tip.isCleanupTask()) {
         launchedCleanup = false;
+      } else if (tip.isSetupTask()) {
+        launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
         // remove from the running queue and put it in the non-running cache
@@ -1896,36 +2005,41 @@ class JobInProgress {
     // update job history
     String taskTrackerName = taskTrackerStatus.getHost();
     long finishTime = status.getFinishTime();
+    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isSetupTask() ? Values.SETUP.name() :
+                      tip.isMapTask() ? Values.MAP.name() : 
+                      Values.REDUCE.name();
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
           status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          tip.isCleanupTask());
+          taskType);
       if (status.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), tip.isCleanupTask());
+                taskTrackerName, status.getDiagnosticInfo(), 
+                taskType);
       } else {
         JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(),
-                tip.isCleanupTask());
+                taskType);
       }
     } else {
       JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
           status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          tip.isCleanupTask());
+          taskType);
       if (status.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), 
-                tip.isCleanupTask());
+                taskType);
       } else {
         JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), 
-                tip.isCleanupTask());
+                taskType);
       }
     }
         
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
-    if (!tip.isCleanupTask()) {
+    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
       if (tip.isMapTask()) {
         failedMapTasks++;
       } else {
@@ -1955,7 +2069,7 @@ class JobInProgress {
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
       //
-      boolean killJob = tip.isCleanupTask() ? true :
+      boolean killJob = tip.isCleanupTask() || tip.isSetupTask() ? true :
                         tip.isMapTask() ? 
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
@@ -1963,12 +2077,8 @@ class JobInProgress {
       if (killJob) {
         LOG.info("Aborting job " + profile.getJobID());
         JobHistory.Task.logFailed(tip.getTIPId(), 
-                                  tip.isCleanupTask() ?
-                                    Values.CLEANUP.name() :
-                                  tip.isMapTask() ? 
-                                          Values.MAP.name() : 
-                                          Values.REDUCE.name(),  
-                                          status.getFinishTime(), 
+                                  taskType,  
+                                  status.getFinishTime(), 
                                   status.getDiagnosticInfo());
         if (tip.isCleanupTask()) {
           // kill the other tip
@@ -1979,6 +2089,10 @@ class JobInProgress {
           }
           terminateJob(JobStatus.FAILED);
         } else {
+          if (tip.isSetupTask()) {
+            // kill the other tip
+            killSetupTip(!tip.isMapTask());
+          }
           fail();
         }
       }
@@ -1986,7 +2100,7 @@ class JobInProgress {
       //
       // Update the counters
       //
-      if (!tip.isCleanupTask()) {
+      if (!tip.isCleanupTask() && !tip.isSetupTask()) {
         if (tip.isMapTask()) {
           jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
         } else {
@@ -1996,6 +2110,14 @@ class JobInProgress {
     }
   }
 
+  void killSetupTip(boolean isMap) {
+    if (isMap) {
+      setup[0].kill();
+    } else {
+      setup[1].kill();
+    }
+  }
+
   /**
    * Fail a task with a given reason, but without a status object.
    * @param tip The task's tip
@@ -2018,6 +2140,7 @@ class JobInProgress {
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(tip.getTIPId(), 
                               tip.isCleanupTask() ? Values.CLEANUP.name() : 
+                              tip.isSetupTask() ? Values.SETUP.name() : 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               tip.getExecFinishTime(), reason, taskid); 
   }
@@ -2072,18 +2195,24 @@ class JobInProgress {
    */
   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
     if (tipid.isMap()) {
-      if (tipid.getId() == maps.length) { // cleanup map tip
+      if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
         return cleanup[0]; 
       }
+      if (tipid.equals(setup[0].getTIPId())) { //setup map tip
+        return setup[0];
+      }
       for (int i = 0; i < maps.length; i++) {
         if (tipid.equals(maps[i].getTIPId())){
           return maps[i];
         }
       }
     } else {
-      if (tipid.getId() == reduces.length) { // cleanup reduce tip
+      if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
         return cleanup[1]; 
       }
+      if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
+        return setup[1];
+      }
       for (int i = 0; i < reduces.length; i++) {
         if (tipid.equals(reduces[i].getTIPId())){
           return reduces[i];

+ 35 - 0
src/mapred/org/apache/hadoop/mapred/JobStatus.java

@@ -52,6 +52,7 @@ public class JobStatus implements Writable {
   private float mapProgress;
   private float reduceProgress;
   private float cleanupProgress;
+  private float setupProgress;
   private int runState;
   private long startTime;
   private String user;
@@ -99,7 +100,25 @@ public class JobStatus implements Writable {
    */
    public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
                       float cleanupProgress, int runState, JobPriority jp) {
+     this(jobid, 0.0f, mapProgress, reduceProgress, 
+          cleanupProgress, runState, jp);
+   }
+   
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param setupProgress The progress made on the setup
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on the cleanup
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+   public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+                    float reduceProgress, float cleanupProgress, 
+                    int runState, JobPriority jp) {
      this.jobid = jobid;
+     this.setupProgress = setupProgress;
      this.mapProgress = mapProgress;
      this.reduceProgress = reduceProgress;
      this.cleanupProgress = cleanupProgress;
@@ -110,6 +129,7 @@ public class JobStatus implements Writable {
      }
      priority = jp;
    }
+   
   /**
    * @deprecated use getJobID instead
    */
@@ -147,6 +167,19 @@ public class JobStatus implements Writable {
     this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
   }
 
+  /**
+   * @return Percentage of progress in setup 
+   */
+  public synchronized float setupProgress() { return setupProgress; }
+    
+  /**
+   * Sets the setup progress of this job
+   * @param p The value of setup progress to set to
+   */
+  synchronized void setSetupProgress(float p) { 
+    this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+
   /**
    * @return Percentage of progress in reduce 
    */
@@ -232,6 +265,7 @@ public class JobStatus implements Writable {
   ///////////////////////////////////////
   public synchronized void write(DataOutput out) throws IOException {
     jobid.write(out);
+    out.writeFloat(setupProgress);
     out.writeFloat(mapProgress);
     out.writeFloat(reduceProgress);
     out.writeFloat(cleanupProgress);
@@ -244,6 +278,7 @@ public class JobStatus implements Writable {
 
   public synchronized void readFields(DataInput in) throws IOException {
     this.jobid = JobID.read(in);
+    this.setupProgress = in.readFloat();
     this.mapProgress = in.readFloat();
     this.reduceProgress = in.readFloat();
     this.cleanupProgress = in.readFloat();

+ 8 - 1
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -47,8 +47,10 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    *             and getAllJobs(queue) as a part of HADOOP-3930
    * Version 14: Added setPriority for HADOOP-4124
    * Version 15: Added KILLED status to JobStatus as part of HADOOP-3924            
+   * Version 16: Added getSetupTaskReports and 
+   *             setupProgress to JobStatus as part of HADOOP-4261           
    */
-  public static final long versionID = 15L;
+  public static final long versionID = 16L;
 
   /**
    * Allocate a name for the job.
@@ -122,6 +124,11 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    */
   public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException;
 
+  /**
+   * Grab a bunch of info on the setup tasks that make up the job
+   */
+  public TaskReport[] getSetupTaskReports(JobID jobid) throws IOException;
+
   /**
    * A MapReduce system always operates on a single filesystem.  This 
    * function returns the fs name.  ('local' if the localfs; 'addr:port' 

+ 47 - 5
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -531,7 +531,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       // is updated
       private void checkAndInit() throws IOException {
         String jobStatus = this.job.get(Keys.JOB_STATUS);
-        if (Values.RUNNING.name().equals(jobStatus)) {
+        if (Values.PREP.name().equals(jobStatus)) {
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           jip.initTasks();
@@ -1860,7 +1860,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
-        List<Task> tasks = getCleanupTask(taskTrackerStatus);
+        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
         if (tasks == null ) {
           tasks = taskScheduler.assignTasks(taskTrackerStatus);
         }
@@ -2099,8 +2099,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return null;
   }
   
-  private synchronized List<Task> getCleanupTask(TaskTrackerStatus taskTracker)
-  throws IOException {
+  // returns cleanup tasks first, then setup tasks.
+  private synchronized List<Task> getSetupAndCleanupTasks(
+    TaskTrackerStatus taskTracker) throws IOException {
     int maxMapTasks = taskTracker.getMaxMapTasks();
     int maxReduceTasks = taskTracker.getMaxReduceTasks();
     int numMaps = taskTracker.countMapTasks();
@@ -2120,6 +2121,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             return Collections.singletonList(t);
           }
         }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+                                  numUniqueHosts, true);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
       }
       if (numReduces < maxReduceTasks) {
         for (Iterator<JobInProgress> it = jobs.values().iterator();
@@ -2131,6 +2141,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             return Collections.singletonList(t);
           }
         }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+                                    numUniqueHosts, false);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
       }
     }
     return null;
@@ -2353,6 +2372,28 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   
   }
   
+  public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
+    JobInProgress job = jobs.get(jobid);
+    if (job == null) {
+      return new TaskReport[0];
+    } else {
+      Vector<TaskReport> reports = new Vector<TaskReport>();
+      Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
+      for (Iterator<TaskInProgress> it = completeTasks.iterator();
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      Vector<TaskInProgress> incompleteTasks = job.reportSetupTIPs(false);
+      for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); 
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      return reports.toArray(new TaskReport[reports.size()]);
+    }
+  }
+  
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
   
   /* 
@@ -2576,7 +2617,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         // And completed maps with zero reducers of the job 
         // never need to be failed. 
         if (!tip.isComplete() || 
-            (tip.isMapTask() && job.desiredReduces() != 0)) {
+            (tip.isMapTask() && !tip.isSetupTask() && 
+             job.desiredReduces() != 0)) {
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING) {
             job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), 

+ 5 - 0
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -114,6 +114,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
         }
         JobContext jContext = new JobContext(conf);
         OutputCommitter outputCommitter = job.getOutputCommitter();
+        outputCommitter.setupJob(jContext);
+        status.setSetupProgress(1.0f);
         
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
@@ -336,6 +338,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
   public TaskReport[] getCleanupTaskReports(JobID id) {
     return new TaskReport[0];
   }
+  public TaskReport[] getSetupTaskReports(JobID id) {
+    return new TaskReport[0];
+  }
 
   public JobStatus getJobStatus(JobID id) {
     Job job = jobs.get(id);

+ 4 - 0
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -283,6 +283,10 @@ class MapTask extends Task {
       runCleanup(umbilical);
       return;
     }
+    if (setupJob) {
+      runSetupJob(umbilical);
+      return;
+    }
 
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);

+ 1 - 5
src/mapred/org/apache/hadoop/mapred/OutputCommitter.java

@@ -30,12 +30,10 @@ import java.io.IOException;
  *   <li>
  *   Setup the job during initialization. For example, create the temporary 
  *   output directory for the job during the initialization of the job.
- *   The job client does the setup for the job.
  *   </li>
  *   <li>
  *   Cleanup the job after the job completion. For example, remove the
- *   temporary output directory after the job completion. CleanupJob is done
- *   by a separate task at the end of the job.
+ *   temporary output directory after the job completion. 
  *   </li>
  *   <li>
  *   Setup the task temporary output.
@@ -61,8 +59,6 @@ public abstract class OutputCommitter {
   /**
    * For the framework to setup the job output during initialization
    * 
-   * The job client does the setup for the job.
-   *   
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException if temporary output could not be created
    */

+ 5 - 1
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -336,7 +336,7 @@ class ReduceTask extends Task {
     job.setBoolean("mapred.skip.on", isSkipping());
     Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
 
-    if (!cleanupJob) {
+    if (!cleanupJob && !setupJob) {
       copyPhase = getProgress().addPhase("copy");
       sortPhase  = getProgress().addPhase("sort");
       reducePhase = getProgress().addPhase("reduce");
@@ -351,6 +351,10 @@ class ReduceTask extends Task {
       runCleanup(umbilical);
       return;
     }
+    if (setupJob) {
+      runSetupJob(umbilical);
+      return;
+    }
     
     // Initialize the codec
     codec = initCodec();

+ 9 - 0
src/mapred/org/apache/hadoop/mapred/RunningJob.java

@@ -93,6 +93,15 @@ public interface RunningJob {
    */
   public float cleanupProgress() throws IOException;
 
+  /**
+   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
+   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's setup-tasks.
+   * @throws IOException
+   */
+  public float setupProgress() throws IOException;
+
   /**
    * Check if the job is finished or not. 
    * This is a non-blocking call.

+ 14 - 0
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -108,6 +108,7 @@ abstract class Task implements Writable, Configurable {
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
   protected boolean cleanupJob = false;
+  protected boolean setupJob = false;
   private Thread pingProgressThread;
   
   //skip ranges based on failed ranges from previous attempts
@@ -241,6 +242,9 @@ abstract class Task implements Writable, Configurable {
     cleanupJob = true;
   }
 
+  public void setSetupTask() {
+    setupJob = true; 
+  }
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -253,6 +257,7 @@ abstract class Task implements Writable, Configurable {
     skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(cleanupJob);
+    out.writeBoolean(setupJob);
     out.writeBoolean(writeSkipRecs);
   }
   public void readFields(DataInput in) throws IOException {
@@ -266,6 +271,7 @@ abstract class Task implements Writable, Configurable {
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     cleanupJob = in.readBoolean();
+    setupJob = in.readBoolean();
     writeSkipRecs = in.readBoolean();
   }
 
@@ -718,6 +724,14 @@ abstract class Task implements Writable, Configurable {
     conf.getOutputCommitter().cleanupJob(jobContext);
     done(umbilical);
   }
+
+  protected void runSetupJob(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // do the setup
+    getProgress().setStatus("setup");
+    conf.getOutputCommitter().setupJob(jobContext);
+    done(umbilical);
+  }
   
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {

+ 17 - 4
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -83,6 +83,7 @@ class TaskInProgress {
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
   private boolean cleanup = false; 
+  private boolean setup = false;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -180,7 +181,15 @@ class TaskInProgress {
   public void setCleanupTask() {
     cleanup = true;
   }
-  
+
+  public boolean isSetupTask() {
+    return setup;
+  }
+	  
+  public void setSetupTask() {
+    setup = true;
+  }
+
   public boolean isOnlyCommitPending() {
     for (TaskStatus t : taskStatuses.values()) {
       if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
@@ -380,7 +389,8 @@ class TaskInProgress {
         (job.getStatus().getRunState() != JobStatus.RUNNING)) {
       tasksReportedClosed.add(taskid);
       close = true;
-    } else if (isComplete() && !(isMapTask() && isComplete(taskid)) &&
+    } else if (isComplete() && 
+               !(isMapTask() && !setup && !cleanup && isComplete(taskid)) &&
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
@@ -565,7 +575,7 @@ class TaskInProgress {
     // should note this failure only for completed maps, only if this taskid;
     // completed this map. however if the job is done, there is no need to 
     // manipulate completed maps
-    if (this.isMapTask() && isComplete(taskid) && 
+    if (this.isMapTask() && !setup && !cleanup && isComplete(taskid) && 
         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
       this.completes--;
       
@@ -850,6 +860,9 @@ class TaskInProgress {
     if (cleanup) {
       t.setCleanupTask();
     }
+    if (setup) {
+      t.setSetupTask();
+    }
     t.setConf(conf);
     LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
     t.setSkipRanges(failedRanges.getSkipRanges());
@@ -951,7 +964,7 @@ class TaskInProgress {
   }
 
   public long getMapInputSize() {
-    if(isMapTask()) {
+    if(isMapTask() && !setup && !cleanup) {
       return rawSplit.getDataLength();
     } else {
       return 0;

+ 2 - 2
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -464,9 +464,9 @@ public class MiniMRCluster {
   }
     
   /**
-   * Get the map task completion events
+   * Get the task completion events
    */
-  public TaskCompletionEvent[] getMapTaskCompletionEvents(JobID id, int from, 
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from, 
                                                           int max) 
   throws IOException {
     return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max);

+ 9 - 6
src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java

@@ -471,8 +471,9 @@ public class TestJobTrackerRestart extends TestCase {
     }
     
     TaskCompletionEvent[] prevEvents = 
-      mr.getMapTaskCompletionEvents(id, 0, numMaps);
-    TaskReport[] prevReports = jobClient.getMapTaskReports(id);
+      mr.getTaskCompletionEvents(id, 0, numMaps);
+    TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
+    TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
     ClusterStatus prevStatus = jobClient.getClusterStatus();
     
     mr.stopJobTracker();
@@ -502,7 +503,7 @@ public class TestJobTrackerRestart extends TestCase {
     
     // Get the new jobtrackers events
     TaskCompletionEvent[] jtEvents =  
-      mr.getMapTaskCompletionEvents(id, 0, 2 * numMaps);
+      mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
     
     // Test if all the events that were recovered match exactly
     testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
@@ -521,8 +522,10 @@ public class TestJobTrackerRestart extends TestCase {
     
     // Check the task reports
     // The reports should match exactly if the attempts are same
-    TaskReport[] afterReports = jobClient.getMapTaskReports(id);
-    testTaskReports(prevReports, afterReports, numToMatch);
+    TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
+    TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
+    testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
+    testTaskReports(prevSetupReports, afterSetupReports, 1);
     
     //  Signal the reduce tasks
     signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
@@ -829,4 +832,4 @@ public class TestJobTrackerRestart extends TestCase {
   public static void main(String[] args) throws IOException {
     new TestJobTrackerRestart().testJobTrackerRestart();
   }
-}
+}

+ 4 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

@@ -70,7 +70,10 @@ public class TestMiniMRLocalFS extends TestCase {
       // test the task report fetchers
       JobClient client = new JobClient(job);
       JobID jobid = ret.job.getID();
-      TaskReport[] reports = client.getMapTaskReports(jobid);
+      TaskReport[] reports;
+      reports = client.getSetupTaskReports(jobid);
+      assertEquals("number of setups", 2, reports.length);
+      reports = client.getMapTaskReports(jobid);
       assertEquals("number of maps", 1, reports.length);
       reports = client.getReduceTaskReports(jobid);
       assertEquals("number of reduces", 1, reports.length);

+ 6 - 3
src/webapps/job/jobdetails.jsp

@@ -88,15 +88,15 @@
               "</td></tr>\n");
   }
 
-  private void printCleanupTaskSummary(JspWriter out,
+  private void printJobLevelTaskSummary(JspWriter out,
                                 String jobId,
+                                String kind,
                                 TaskInProgress[] tasks
                                ) throws IOException {
     int totalTasks = tasks.length;
     int runningTasks = 0;
     int finishedTasks = 0;
     int killedTasks = 0;
-    String kind = "cleanup";
     for(int i=0; i < totalTasks; ++i) {
       TaskInProgress task = tasks[i];
       if (task.isComplete()) {
@@ -208,6 +208,9 @@
     out.print("<b>Job Name:</b> " + profile.getJobName() + "<br>\n");
     out.print("<b>Job File:</b> <a href=\"jobconf.jsp?jobid=" + jobId + "\">" 
               + profile.getJobFile() + "</a><br>\n");
+    out.print("<b>Job Setup:</b>");
+    printJobLevelTaskSummary(out, jobId, "setup", job.getSetupTasks());
+    out.print("<br>\n");
     if (runState == JobStatus.RUNNING) {
       out.print("<b>Status:</b> Running<br>\n");
       out.print("<b>Started at:</b> " + new Date(job.getStartTime()) + "<br>\n");
@@ -238,7 +241,7 @@
       }
     }
     out.print("<b>Job Cleanup:</b>");
-    printCleanupTaskSummary(out, jobId, job.getCleanupTasks());
+    printJobLevelTaskSummary(out, jobId, "cleanup", job.getCleanupTasks());
     out.print("<br>\n");
     if (flakyTaskTrackers > 0) {
       out.print("<b>Black-listed TaskTrackers:</b> " + 

+ 34 - 0
src/webapps/job/jobdetailshistory.jsp

@@ -42,6 +42,7 @@
     int totalMaps = 0 ; 
     int totalReduces = 0;
     int totalCleanups = 0; 
+    int totalSetups = 0; 
     int numFailedMaps = 0; 
     int numKilledMaps = 0;
     int numFailedReduces = 0 ; 
@@ -49,6 +50,9 @@
     int numFinishedCleanups = 0;
     int numFailedCleanups = 0;
     int numKilledCleanups = 0;
+    int numFinishedSetups = 0;
+    int numFailedSetups = 0;
+    int numKilledSetups = 0;
 	
     long mapStarted = 0 ; 
     long mapFinished = 0 ; 
@@ -56,6 +60,8 @@
     long reduceFinished = 0;
     long cleanupStarted = 0;
     long cleanupFinished = 0; 
+    long setupStarted = 0;
+    long setupFinished = 0; 
         
     Map <String,String> allHosts = new TreeMap<String,String>();
     for (JobHistory.Task task : tasks.values()) {
@@ -104,6 +110,21 @@
             numFailedCleanups++;
           } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
             numKilledCleanups++;
+          } 
+        } else if (Values.SETUP.name().equals(task.get(Keys.TASK_TYPE))) {
+          if (setupStarted==0||setupStarted > startTime) {
+            setupStarted = startTime ; 
+          }
+          if (setupFinished < finishTime) {
+            setupFinished = finishTime; 
+          }
+          totalSetups++; 
+          if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFinishedSetups++;
+          } else if (Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFailedSetups++;
+          } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numKilledSetups++;
           }
         }
       }
@@ -117,6 +138,19 @@
 <td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
 </tr>
 <tr>
+<td>Setup</td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=all">
+        <%=totalSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.SUCCESS %>">
+        <%=numFinishedSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.FAILED %>">
+        <%=numFailedSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.KILLED %>">
+        <%=numKilledSetups%></a></td>  
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, setupStarted, 0) %></td>
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, setupFinished, setupStarted) %></td>
+</tr>
+<tr>
 <td>Map</td>
     <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.MAP.name() %>&status=all">
         <%=totalMaps %></a></td>

+ 5 - 2
src/webapps/job/jobtasks.jsp

@@ -21,7 +21,7 @@
   }
   String type = request.getParameter("type");
   String pagenum = request.getParameter("pagenum");
-  TaskInProgress[] tasks;
+  TaskInProgress[] tasks = null;
   String state = request.getParameter("state");
   state = (state!=null) ? state : "all";
   int pnum = Integer.parseInt(pagenum);
@@ -42,9 +42,12 @@
   else if ("reduce".equals(type)) {
     reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
     tasks = (job != null) ? job.getReduceTasks() : null;
-  } else {
+  } else if ("cleanup".equals(type)) {
     reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
     tasks = (job != null) ? job.getCleanupTasks() : null;
+  } else if ("setup".equals(type)) {
+    reports = (job != null) ? tracker.getSetupTaskReports(jobidObj) : null;
+    tasks = (job != null) ? job.getSetupTasks() : null;
   }
 %>
 

+ 7 - 4
src/webapps/job/taskdetails.jsp

@@ -69,9 +69,12 @@
     }
     TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
         : null;
-    boolean isCleanup = false;
+    boolean isCleanupOrSetup = false;
     if (tipidObj != null) { 
-      isCleanup = job.getTaskInProgress(tipidObj).isCleanupTask();
+      isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+      if (!isCleanupOrSetup) {
+        isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+      }
     }
 %>
 
@@ -98,7 +101,7 @@
 <table border=2 cellpadding="5" cellspacing="2">
 <tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td> 
   <%
-   if (!ts[0].getIsMap() && !isCleanup) {
+   if (!ts[0].getIsMap() && !isCleanupOrSetup) {
    %>
 <td>Shuffle Finished</td><td>Sort Finished</td>
   <%
@@ -126,7 +129,7 @@
         out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getStartTime(), 0) + "</td>");
-        if (!ts[i].getIsMap() && !isCleanup) {
+        if (!ts[i].getIsMap() && !isCleanupOrSetup) {
           out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getShuffleFinishTime(), status.getStartTime()) + "</td>");

部分文件因文件數量過多而無法顯示