Parcourir la source

HADOOP-3150. Moves task promotion to tasks. Defines a new interface for committing output files. Moves job setup to jobclient, and moves jobcleanup to a separate task. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@692408 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das il y a 17 ans
Parent
commit
042e0ac022
40 fichiers modifiés avec 1541 ajouts et 601 suppressions
  1. 4 0
      CHANGES.txt
  2. 47 4
      src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
  3. 1 0
      src/docs/src/documentation/content/xdocs/site.xml
  4. 53 0
      src/mapred/org/apache/hadoop/mapred/CommitTaskAction.java
  5. 209 0
      src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
  6. 15 15
      src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
  7. 2 1
      src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
  8. 10 1
      src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
  9. 28 1
      src/mapred/org/apache/hadoop/mapred/JobClient.java
  10. 22 0
      src/mapred/org/apache/hadoop/mapred/JobConf.java
  11. 53 0
      src/mapred/org/apache/hadoop/mapred/JobContext.java
  12. 239 117
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  13. 53 14
      src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
  14. 32 1
      src/mapred/org/apache/hadoop/mapred/JobStatus.java
  15. 8 1
      src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
  16. 54 194
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  17. 17 5
      src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
  18. 25 49
      src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
  19. 0 5
      src/mapred/org/apache/hadoop/mapred/MRConstants.java
  20. 7 1
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  21. 117 0
      src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
  22. 8 2
      src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  23. 9 0
      src/mapred/org/apache/hadoop/mapred/RunningJob.java
  24. 171 142
      src/mapred/org/apache/hadoop/mapred/Task.java
  25. 56 0
      src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java
  26. 44 1
      src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  27. 1 1
      src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  28. 5 3
      src/mapred/org/apache/hadoop/mapred/TaskStatus.java
  29. 59 29
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  30. 9 1
      src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
  31. 4 2
      src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
  32. 21 3
      src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
  33. 93 0
      src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
  34. 2 0
      src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
  35. 5 2
      src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
  36. 7 0
      src/test/org/apache/hadoop/mapred/TestQueueManager.java
  37. 4 3
      src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
  38. 5 2
      src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
  39. 38 0
      src/webapps/job/jobdetails.jsp
  40. 4 1
      src/webapps/job/jobtasks.jsp

+ 4 - 0
CHANGES.txt

@@ -47,6 +47,10 @@ Trunk (unreleased changes)
     so increment the InterTrackerProtocol version. (Hemanth Yamijala via 
     omalley)
 
+    HADOOP-3150. Moves task promotion to tasks. Defines a new interface for
+    committing output files. Moves job setup to jobclient, and moves jobcleanup
+    to a separate task. (Amareshwari Sriramadasu via ddas)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

+ 47 - 4
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -715,7 +715,8 @@
       <p>We will then discuss other core interfaces including 
       <code>JobConf</code>, <code>JobClient</code>, <code>Partitioner</code>, 
       <code>OutputCollector</code>, <code>Reporter</code>, 
-      <code>InputFormat</code>, <code>OutputFormat</code> and others.</p>
+      <code>InputFormat</code>, <code>OutputFormat</code>,
+      <code>OutputCommitter</code> and others.</p>
       
       <p>Finally, we will wrap up by discussing some useful features of the
       framework such as the <code>DistributedCache</code>, 
@@ -1013,8 +1014,9 @@
  
         <p><code>JobConf</code> is typically used to specify the 
         <code>Mapper</code>, combiner (if any), <code>Partitioner</code>, 
-        <code>Reducer</code>, <code>InputFormat</code> and 
-        <code>OutputFormat</code> implementations. <code>JobConf</code> also 
+        <code>Reducer</code>, <code>InputFormat</code>, 
+        <code>OutputFormat</code> and <code>OutputCommitter</code> 
+        implementations. <code>JobConf</code> also 
         indicates the set of input files 
         (<a href="ext:api/org/apache/hadoop/mapred/fileinputformat/setinputpaths">setInputPaths(JobConf, Path...)</a>
         /<a href="ext:api/org/apache/hadoop/mapred/fileinputformat/addinputpath">addInputPath(JobConf, Path)</a>)
@@ -1428,6 +1430,45 @@
  
         <p><code>TextOutputFormat</code> is the default 
         <code>OutputFormat</code>.</p>
+
+        <section>
+        <title>OutputCommitter</title>
+        
+        <p><a href="ext:api/org/apache/hadoop/mapred/outputcommitter">
+        OutputCommitter</a> describes the commit of task output for a 
+        Map/Reduce job.</p>
+
+        <p>The Map/Reduce framework relies on the <code>OutputCommitter</code>
+        of the job to:</p>
+        <ol>
+          <li>
+            Setup the job during initialization. For example, create
+            the temporary output directory for the job during the
+            initialization of the job. The job client does the setup
+            for the job.
+          </li>
+          <li>
+            Cleanup the job after the job completion. For example, remove the
+            temporary output directory after the job completion. A separate 
+            task does the cleanupJob at the end of the job.
+          </li>
+          <li>
+            Setup the task temporary output.
+          </li> 
+          <li>
+            Check whether a task needs a commit. This is to avoid the commit
+            procedure if a task does not need commit.
+          </li>
+          <li>
+            Commit of the task output. 
+          </li> 
+          <li>
+            Discard the task commit.
+          </li>
+        </ol>
+        <p><code>FileOutputCommitter</code> is the default 
+        <code>OutputCommitter</code>.</p>
+        </section>
  
         <section>
           <title>Task Side-Effect Files</title>
@@ -1443,7 +1484,9 @@
           (using the attemptid, say <code>attempt_200709221812_0001_m_000000_0</code>), 
           not just per task.</p> 
  
-          <p>To avoid these issues the Map/Reduce framework maintains a special 
+          <p>To avoid these issues the Map/Reduce framework, when the 
+          <code>OutputCommitter</code> is <code>FileOutputCommitter</code>, 
+          maintains a special 
           <code>${mapred.output.dir}/_temporary/_${taskid}</code> sub-directory
           accessible via <code>${mapred.work.output.dir}</code>
           for each task-attempt on the <code>FileSystem</code> where the output

+ 1 - 0
src/docs/src/documentation/content/xdocs/site.xml

@@ -190,6 +190,7 @@ See http://forrest.apache.org/docs/linking.html for more info.
               <outputcollector href="OutputCollector.html">
                 <collect href="#collect(K, V)" />
               </outputcollector>
+              <outputcommitter href="OutputCommitter.html" />
               <outputformat href="OutputFormat.html" />
               <outputlogfilter href="OutputLogFilter.html" />
               <sequencefileoutputformat href="SequenceFileOutputFormat.html">

+ 53 - 0
src/mapred/org/apache/hadoop/mapred/CommitTaskAction.java

@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker} 
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to commit the output
+ * of the task.
+ * 
+ */
+class CommitTaskAction extends TaskTrackerAction {
+  private TaskAttemptID taskId;
+  
+  public CommitTaskAction() {
+    super(ActionType.COMMIT_TASK);
+  }
+  
+  public CommitTaskAction(TaskAttemptID taskId) {
+    super(ActionType.COMMIT_TASK);
+    this.taskId = taskId;
+  }
+  
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    taskId = TaskAttemptID.read(in);
+  }
+}

+ 209 - 0
src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java

@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+/** An {@link OutputCommitter} that commits files specified 
+ * in job output directory i.e. ${mapred.output.dir}. 
+ **/
+public class FileOutputCommitter extends OutputCommitter {
+
+  public static final Log LOG = LogFactory.getLog(
+      "org.apache.hadoop.mapred.FileOutputCommitter");
+/**
+   * Temporary directory name 
+   */
+  public static final String TEMP_DIR_NAME = "_temporary";
+
+  public void setupJob(JobContext context) throws IOException {
+    JobConf conf = context.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(conf);
+      if (!fileSys.mkdirs(tmpDir)) {
+        LOG.error("Mkdirs failed to create " + tmpDir.toString());
+      }
+    }
+  }
+
+  public void cleanupJob(JobContext context) throws IOException {
+    JobConf conf = context.getJobConf();
+    // do the clean up of temporary directory
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(conf);
+      context.getProgressible().progress();
+      if (fileSys.exists(tmpDir)) {
+        fileSys.delete(tmpDir, true);
+      }
+    }
+  }
+
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    // FileOutputCommitter's setupTask doesn't do anything. Because the
+    // temporary task directory is created on demand when the 
+    // task is writing.
+  }
+		  
+  public void commitTask(TaskAttemptContext context) 
+  throws IOException {
+    Path taskOutputPath = getTempTaskOutputPath(context);
+    TaskAttemptID attemptId = context.getTaskAttemptID();
+    JobConf job = context.getJobConf();
+    if (taskOutputPath != null) {
+      FileSystem fs = taskOutputPath.getFileSystem(job);
+      context.getProgressible().progress();
+      if (fs.exists(taskOutputPath)) {
+        Path jobOutputPath = taskOutputPath.getParent().getParent();
+        // Move the task outputs to their final place
+        moveTaskOutputs(context, fs, jobOutputPath, taskOutputPath);
+        // Delete the temporary task-specific output directory
+        if (!fs.delete(taskOutputPath, true)) {
+          LOG.info("Failed to delete the temporary output" + 
+          " directory of task: " + attemptId + " - " + taskOutputPath);
+        }
+        LOG.info("Saved output of task '" + attemptId + "' to " + 
+                 jobOutputPath);
+      }
+    }
+  }
+		  
+  private void moveTaskOutputs(TaskAttemptContext context,
+                               FileSystem fs,
+                               Path jobOutputDir,
+                               Path taskOutput) 
+  throws IOException {
+    TaskAttemptID attemptId = context.getTaskAttemptID();
+    context.getProgressible().progress();
+    if (fs.isFile(taskOutput)) {
+      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+                                          getTempTaskOutputPath(context));
+      if (!fs.rename(taskOutput, finalOutputPath)) {
+        if (!fs.delete(finalOutputPath, true)) {
+          throw new IOException("Failed to delete earlier output of task: " + 
+                                 attemptId);
+        }
+        if (!fs.rename(taskOutput, finalOutputPath)) {
+          throw new IOException("Failed to save output of task: " + 
+        		  attemptId);
+        }
+      }
+      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
+    } else if(fs.getFileStatus(taskOutput).isDir()) {
+      FileStatus[] paths = fs.listStatus(taskOutput);
+      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+	          getTempTaskOutputPath(context));
+      fs.mkdirs(finalOutputPath);
+      if (paths != null) {
+        for (FileStatus path : paths) {
+          moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
+        }
+      }
+    }
+  }
+
+  public void abortTask(TaskAttemptContext context) {
+    Path taskOutputPath =  getTempTaskOutputPath(context);
+    try {
+      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+      context.getProgressible().progress();
+      fs.delete(taskOutputPath, true);
+    } catch (IOException ie) {
+      LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
+    }
+  }
+
+  private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
+                            Path taskOutputPath) {
+    URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri());
+    if (relativePath.getPath().length() > 0) {
+      return new Path(jobOutputDir, relativePath.getPath());
+    } else {
+      return jobOutputDir;
+    }
+  }
+
+  public boolean needsTaskCommit(TaskAttemptContext context) 
+  throws IOException {
+    try {
+      Path taskOutputPath = getTempTaskOutputPath(context);
+      if (taskOutputPath != null) {
+        context.getProgressible().progress();
+        // Get the file-system for the task output directory
+        FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+        // since task output path is created on demand, 
+        // if it exists, task needs a commit
+        if (fs.exists(taskOutputPath)) {
+          return true;
+        }
+      }
+    } catch (IOException  ioe) {
+      throw ioe;
+    }
+    return false;
+  }
+
+  Path getTempTaskOutputPath(TaskAttemptContext taskContext) {
+    JobConf conf = taskContext.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      Path p = new Path(outputPath,
+                     (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+                      "_" + taskContext.getTaskAttemptID().toString()));
+      try {
+        FileSystem fs = p.getFileSystem(conf);
+        return p.makeQualified(fs);
+      } catch (IOException ie) {
+        LOG.warn(StringUtils .stringifyException(ie));
+        return p;
+      }
+    }
+    return null;
+  }
+  
+  Path getWorkPath(TaskAttemptContext taskContext, Path basePath) 
+  throws IOException {
+    // ${mapred.out.dir}/_temporary
+    Path jobTmpDir = new Path(basePath, FileOutputCommitter.TEMP_DIR_NAME);
+    FileSystem fs = jobTmpDir.getFileSystem(taskContext.getJobConf());
+    if (!fs.exists(jobTmpDir)) {
+      throw new IOException("The temporary job-output directory " + 
+          jobTmpDir.toString() + " doesn't exist!"); 
+    }
+    // ${mapred.out.dir}/_temporary/_${taskid}
+    String taskid = taskContext.getTaskAttemptID().toString();
+    Path taskTmpDir = new Path(jobTmpDir, "_" + taskid);
+    if (!fs.mkdirs(taskTmpDir)) {
+      throw new IOException("Mkdirs failed to create " 
+          + taskTmpDir.toString());
+    }
+    return taskTmpDir;
+  }
+}

+ 15 - 15
src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java

@@ -152,6 +152,12 @@ public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
    *  
    * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
    * 
+   * <p><i>Note:</i> The following is valid only if the {@link OutputCommitter}
+   *  is {@link FileOutputCommitter}. If <code>OutputCommitter</code> is not 
+   *  a <code>FileOutputCommitter</code>, the task's temporary output
+   *  directory is same as {@link #getOutputPath(JobConf)} i.e.
+   *  <tt>${mapred.output.dir}$</tt></p>
+   *  
    * <p>Some applications need to create/write-to side-files, which differ from
    * the actual job-outputs.
    * 
@@ -207,29 +213,23 @@ public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
    */
   protected static Path getTaskOutputPath(JobConf conf, String name) 
   throws IOException {
-    // ${mapred.job.dir}
+    // ${mapred.out.dir}
     Path outputPath = getOutputPath(conf);
     if (outputPath == null) {
       throw new IOException("Undefined job output-path");
     }
 
-    // ${mapred.out.dir}/_temporary
-    Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-    FileSystem fs = jobTmpDir.getFileSystem(conf);
-    if (!fs.exists(jobTmpDir)) {
-      throw new IOException("The temporary job-output directory " + 
-          jobTmpDir.toString() + " doesn't exist!"); 
-    }
-
-    // ${mapred.out.dir}/_temporary/_${taskid}
-    Path taskTmpDir = getWorkOutputPath(conf);
-    if (!fs.mkdirs(taskTmpDir)) {
-      throw new IOException("Mkdirs failed to create " 
-          + taskTmpDir.toString());
+    OutputCommitter committer = conf.getOutputCommitter();
+    Path workPath = outputPath;
+    TaskAttemptContext context = new TaskAttemptContext(conf,
+                TaskAttemptID.forName(conf.get("mapred.task.id")));
+    if (committer instanceof FileOutputCommitter) {
+      workPath = ((FileOutputCommitter)committer).getWorkPath(context,
+                                                              outputPath);
     }
     
     // ${mapred.out.dir}/_temporary/_${taskid}/${name}
-    return new Path(taskTmpDir, name);
+    return new Path(workPath, name);
   } 
 
   /**

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -46,8 +46,9 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * Version 14: replaced getFilesystemName with getSystemDir for HADOOP-3135
    * Version 15: Changed format of Task and TaskStatus for HADOOP-153
    * Version 16: adds ResourceStatus to TaskTrackerStatus for HADOOP-3759
+   * Version 17: Changed format of Task and TaskStatus for HADOOP-3150
    */
-  public static final long versionID = 16L;
+  public static final long versionID = 17L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

+ 10 - 1
src/mapred/org/apache/hadoop/mapred/IsolationRunner.java

@@ -46,7 +46,7 @@ public class IsolationRunner {
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public void done(TaskAttemptID taskid, boolean shouldPromote) throws IOException {
+    public void done(TaskAttemptID taskid) throws IOException {
       LOG.info("Task " + taskid + " reporting done.");
     }
 
@@ -66,6 +66,15 @@ public class IsolationRunner {
       return true;
     }
 
+    public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) 
+    throws IOException, InterruptedException {
+      statusUpdate(taskId, taskStatus);
+    }
+    
+    public boolean canCommit(TaskAttemptID taskid) throws IOException {
+      return true;
+    }
+    
     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {
       StringBuffer buf = new StringBuffer("Task ");

+ 28 - 1
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -252,6 +252,15 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       return status.reduceProgress();
     }
 
+    /**
+     * A float between 0.0 and 1.0, indicating the % of cleanup work
+     * completed.
+     */
+    public float cleanupProgress() throws IOException {
+      ensureFreshStatus();
+      return status.cleanupProgress();
+    }
+
     /**
      * Returns immediately whether the whole job is done yet or not.
      */
@@ -786,6 +795,13 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       out.close();
     }
 
+    // skip doing setup if there are no maps for the job.
+    // because if there are no maps, job is considered completed and successful
+    if (splits.length != 0) {
+      // do setupJob
+      job.getOutputCommitter().setupJob(new JobContext(job));
+    }
+
     //
     // Now, actually submit the job (using the submit name)
     //
@@ -967,7 +983,18 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
     return jobSubmitClient.getReduceTaskReports(jobId);
   }
-   
+
+  /**
+   * Get the information of the current state of the cleanup tasks of a job.
+   * 
+   * @param jobId the job to query.
+   * @return the list of all of the cleanup tips.
+   * @throws IOException
+   */    
+  public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
+    return jobSubmitClient.getCleanupTaskReports(jobId);
+  }
+  
   /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
   @Deprecated
   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {

+ 22 - 0
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -360,6 +360,28 @@ public class JobConf extends Configuration {
                                                               OutputFormat.class),
                                                      this);
   }
+
+  /**
+   * Get the {@link OutputCommitter} implementation for the map-reduce job,
+   * defaults to {@link FileOutputCommitter} if not specified explicitly.
+   * 
+   * @return the {@link OutputCommitter} implementation for the map-reduce job.
+   */
+  public OutputCommitter getOutputCommitter() {
+    return (OutputCommitter)ReflectionUtils.newInstance(
+      getClass("mapred.output.committer.class", FileOutputCommitter.class,
+               OutputCommitter.class), this);
+  }
+
+  /**
+   * Set the {@link OutputCommitter} implementation for the map-reduce job.
+   * 
+   * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
+   *                 job.
+   */
+  public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
+    setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
+  }
   
   /**
    * Set the {@link OutputFormat} implementation for the map-reduce job.

+ 53 - 0
src/mapred/org/apache/hadoop/mapred/JobContext.java

@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.Progressable;
+
+public class JobContext {
+
+  JobConf job;
+  private Progressable progress;
+
+  JobContext(JobConf conf, Progressable progress) {
+    job = conf;
+    this.progress = progress;
+  }
+
+  JobContext(JobConf conf) {
+    this(conf, Reporter.NULL);
+  }
+  
+  /**
+   * Get the job Configuration
+   * 
+   * @return JobConf
+   */
+  public JobConf getJobConf() {
+    return job;
+  }
+  
+  /**
+   * Get the progress mechanism for reporting progress.
+   * 
+   * @return progress mechanism 
+   */
+  public Progressable getProgressible() {
+    return progress;
+  }
+}

+ 239 - 117
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -61,6 +61,7 @@ class JobInProgress {
 
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
+  TaskInProgress cleanup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
   
@@ -76,6 +77,8 @@ class JobInProgress {
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
   int failedReduceTIPs = 0;
+  private volatile boolean launchedCleanup = false;
+  private volatile boolean jobKilled = false;
 
   JobPriority priority = JobPriority.NORMAL;
   JobTracker jobtracker = null;
@@ -361,6 +364,7 @@ class JobInProgress {
       this.finishTime = System.currentTimeMillis();
       status.setMapProgress(1.0f);
       status.setReduceProgress(1.0f);
+      status.setCleanupProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
       tasksInited.set(true);
       JobHistory.JobInfo.logStarted(profile.getJobID(), 
@@ -385,15 +389,18 @@ class JobInProgress {
       nonRunningReduces.add(reduces[i]);
     }
 
-    // create job specific temporary directory in output path
-    Path outputPath = FileOutputFormat.getOutputPath(conf);
-    if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(conf);
-      if (!fileSys.mkdirs(tmpDir)) {
-        LOG.error("Mkdirs failed to create " + tmpDir.toString());
-      }
-    }
+    // create cleanup two cleanup tips, one map and one reduce.
+    cleanup = new TaskInProgress[2];
+    // cleanup map tip. This map is doesn't use split. 
+    // Just assign splits[0]
+    cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+            jobtracker, conf, this, numMapTasks);
+    cleanup[0].setCleanupTask();
+
+    // cleanup reduce tip.
+    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                       numReduceTasks, jobtracker, conf, this);
+    cleanup[1].setCleanupTask();
 
     this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, JobStatus.RUNNING);
     tasksInited.set(true);
@@ -462,6 +469,14 @@ class JobInProgress {
     return maps;
   }
     
+  /**
+   * Get the list of cleanup tasks
+   * @return the array of cleanup tasks for the job
+   */
+  TaskInProgress[] getCleanupTasks() {
+    return cleanup;
+  }
+  
   /**
    * Get the list of reduce tasks
    * @return the raw array of reduce tasks for this job
@@ -481,7 +496,7 @@ class JobInProgress {
   /**
    * Return a vector of completed TaskInProgress objects
    */
-  public Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
+  public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
                                                       boolean shouldBeComplete) {
     
     Vector<TaskInProgress> results = new Vector<TaskInProgress>();
@@ -498,6 +513,21 @@ class JobInProgress {
     }
     return results;
   }
+  
+  /**
+   * Return a vector of cleanup TaskInProgress objects
+   */
+  public synchronized Vector<TaskInProgress> reportCleanupTIPs(
+                                               boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+    for (int i = 0; i < cleanup.length; i++) {
+      if (cleanup[i].isComplete() == shouldBeComplete) {
+        results.add(cleanup[i]);
+      }
+    }
+    return results;
+  }
 
   ////////////////////////////////////////////////////
   // Status update methods
@@ -509,6 +539,9 @@ class JobInProgress {
     double oldProgress = tip.getProgress();   // save old progress
     boolean wasRunning = tip.isRunning();
     boolean wasComplete = tip.isComplete();
+    boolean wasPending = tip.isOnlyCommitPending();
+    TaskAttemptID taskid = status.getTaskID();
+    
     // If the TIP is already completed and the task reports as SUCCEEDED then 
     // mark the task as KILLED.
     // In case of task with no promotion the task tracker will mark the task 
@@ -523,13 +556,6 @@ class JobInProgress {
         this.jobtracker.getTaskTracker(status.getTaskTracker());
       String httpTaskLogLocation = null; 
 
-      if (state == TaskStatus.State.COMMIT_PENDING) {
-        JobWithTaskContext j = new JobWithTaskContext(this, tip, 
-                                                      status.getTaskID(),
-                                                      metrics);
-        jobtracker.addToCommitQueue(j);
-      }
-
       if (null != ttStatus){
         String host;
         if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
@@ -545,7 +571,7 @@ class JobInProgress {
       if (state == TaskStatus.State.SUCCEEDED) {
         taskEvent = new TaskCompletionEvent(
                                             taskCompletionEventTracker, 
-                                            status.getTaskID(),
+                                            taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
@@ -554,27 +580,29 @@ class JobInProgress {
         taskEvent.setTaskRunTime((int)(status.getFinishTime() 
                                        - status.getStartTime()));
         tip.setSuccessEventNumber(taskCompletionEventTracker); 
-      }
-      //For a failed task update the JT datastructures.For the task state where
-      //only the COMMIT is pending, delegate everything to the JT thread. For
-      //failed tasks we want the JT to schedule a reexecution ASAP (and not go
-      //via the queue for the datastructures' updates).
-      else if (state == TaskStatus.State.COMMIT_PENDING) {
+      } else if (state == TaskStatus.State.COMMIT_PENDING) {
+        // If it is the first attempt reporting COMMIT_PENDING
+        // ask the task to commit.
+        if (!wasComplete && !wasPending) {
+          tip.doCommit(taskid);
+        }
         return;
-      } else if (state == TaskStatus.State.FAILED ||
-                 state == TaskStatus.State.KILLED) {
+      }
+      //For a failed task update the JT datastructures. 
+      else if (state == TaskStatus.State.FAILED ||
+               state == TaskStatus.State.KILLED) {
         // Get the event number for the (possibly) previously successful
         // task. If there exists one, then set that status to OBSOLETE 
         int eventNumber;
         if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
           TaskCompletionEvent t = 
             this.taskCompletionEvents.get(eventNumber);
-          if (t.getTaskAttemptId().equals(status.getTaskID()))
+          if (t.getTaskAttemptId().equals(taskid))
             t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
         }
         
         // Tell the job to fail the relevant task
-        failedTask(tip, status.getTaskID(), status, ttStatus,
+        failedTask(tip, taskid, status, ttStatus,
                    wasRunning, wasComplete, metrics);
 
         // Did the task failure lead to tip failure?
@@ -586,7 +614,7 @@ class JobInProgress {
           taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
         }
         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
-                                            status.getTaskID(),
+                                            taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap(),
                                             taskCompletionStatus, 
@@ -616,21 +644,24 @@ class JobInProgress {
       LOG.debug("Taking progress for " + tip.getTIPId() + " from " + 
                  oldProgress + " to " + tip.getProgress());
     }
-    double progressDelta = tip.getProgress() - oldProgress;
-    if (tip.isMapTask()) {
-      if (maps.length == 0) {
-        this.status.setMapProgress(1.0f);
-      } else {
-        this.status.setMapProgress((float) (this.status.mapProgress() +
-                                            progressDelta / maps.length));
-      }
-    } else {
-      if (reduces.length == 0) {
-        this.status.setReduceProgress(1.0f);
+    
+    if (!tip.isCleanupTask()) {
+      double progressDelta = tip.getProgress() - oldProgress;
+      if (tip.isMapTask()) {
+        if (maps.length == 0) {
+          this.status.setMapProgress(1.0f);
+        } else {
+          this.status.setMapProgress((float) (this.status.mapProgress() +
+                                              progressDelta / maps.length));
+        }
       } else {
-        this.status.setReduceProgress
-          ((float) (this.status.reduceProgress() +
-                    (progressDelta / reduces.length)));
+        if (reduces.length == 0) {
+          this.status.setReduceProgress(1.0f);
+        } else {
+          this.status.setReduceProgress
+            ((float) (this.status.reduceProgress() +
+                      (progressDelta / reduces.length)));
+        }
       }
     }
   }
@@ -662,7 +693,7 @@ class JobInProgress {
    *  Returns the total job counters, by adding together the job, 
    *  the map and the reduce counters.
    */
-  public Counters getCounters() {
+  public synchronized Counters getCounters() {
     Counters result = new Counters();
     result.incrAllCounters(getJobCounters());
     incrementTaskCounters(result, maps);
@@ -719,6 +750,77 @@ class JobInProgress {
     return result;
   }    
 
+  /**
+   * Return a CleanupTask, if appropriate, to run on the given tasktracker
+   * 
+   */
+  public synchronized Task obtainCleanupTask(TaskTrackerStatus tts, 
+                                             int clusterSize, 
+                                             int numUniqueHosts,
+                                             boolean isMapSlot
+                                            ) throws IOException {
+    if (!canLaunchCleanupTask()) {
+      return null;
+    }
+    
+    String taskTracker = tts.getTrackerName();
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return null;
+    }
+    
+    List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
+    if (isMapSlot) {
+      cleanupTaskList.add(cleanup[0]);
+    } else {
+      cleanupTaskList.add(cleanup[1]);
+    }
+    TaskInProgress tip = findTaskFromList(cleanupTaskList,
+                           tts, numUniqueHosts, false);
+    if (tip == null) {
+      return null;
+    }
+    
+    // Now launch the cleanupTask
+    Task result = tip.getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      launchedCleanup = true;
+      if (tip.isFirstAttempt(result.getTaskID())) {
+        JobHistory.Task.logStarted(tip.getTIPId(), 
+          tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(),
+          System.currentTimeMillis(), tip.getSplitNodes());
+      }
+    }
+    return result;
+  }
+  
+  /**
+   * Check whether cleanup task can be launched for the job.
+   * 
+   * Cleanup task can be launched if it is not already launched
+   * or job is Killed
+   * or all maps and reduces are complete
+   * @return true/false
+   */
+  private synchronized boolean canLaunchCleanupTask() {
+    if (launchedCleanup) {
+      return false;
+    }
+    // check if job has failed or killed
+    if (jobKilled) {
+      return true;
+    }
+    // Check if all maps and reducers have finished.
+    boolean launchCleanupTask = 
+        ((finishedMapTasks + failedMapTIPs) == (numMapTasks));
+    if (launchCleanupTask) {
+      launchCleanupTask = 
+        ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
+    }
+    return launchCleanupTask;
+  }
+  
   /**
    * Return a ReduceTask, if appropriate, to run on the given tasktracker.
    * We don't have cache-sensitivity for reduce tasks, as they
@@ -1408,27 +1510,45 @@ class JobInProgress {
     }
         
     // Update the running/finished map/reduce counts
-    if (tip.isMapTask()){
-      runningMapTasks -= 1;
-      finishedMapTasks += 1;
-      metrics.completeMap(taskid);
-      // remove the completed map from the resp running caches
-      retireMap(tip);
-    } else{
-      runningReduceTasks -= 1;
-      finishedReduceTasks += 1;
-      metrics.completeReduce(taskid);
-      // remove the completed reduces from the running reducers set
-      retireReduce(tip);
-    }
-        
-    //
-    // Figure out whether the Job is done
-    //
-    isJobComplete(tip, metrics);
-    
-    if (this.status.getRunState() != JobStatus.RUNNING) {
-      // The job has been killed/failed, 
+    if (!tip.isCleanupTask()) {
+      if (tip.isMapTask()) {
+        runningMapTasks -= 1;
+        finishedMapTasks += 1;
+        metrics.completeMap(taskid);
+        // remove the completed map from the resp running caches
+        retireMap(tip);
+        if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
+          this.status.setMapProgress(1.0f);
+        }
+      } else {
+        runningReduceTasks -= 1;
+        finishedReduceTasks += 1;
+        metrics.completeReduce(taskid);
+        // remove the completed reduces from the running reducers set
+        retireReduce(tip);
+        if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
+          this.status.setReduceProgress(1.0f);
+        }
+      }
+    } else {
+      // cleanup task has finished. Kill the extra cleanup tip
+      if (tip.isMapTask()) {
+        // kill the reduce tip
+        cleanup[1].kill();
+      } else {
+        cleanup[0].kill();
+      }
+      //
+      // The Job is done
+      //
+      // if the job is killed, then mark the job failed.
+      if (jobKilled) {
+        killJob();
+      }
+      else {
+        jobComplete(metrics);
+      }
+      // The job has been killed/failed/successful
       // JobTracker should cleanup this task
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
       return false;
@@ -1438,31 +1558,18 @@ class JobInProgress {
   }
 
   /**
-   * Check if the job is done since all it's component tasks are either
+   * The job is done since all it's component tasks are either
    * successful or have failed.
    * 
-   * @param tip the current tip which completed either succesfully or failed
    * @param metrics job-tracker metrics
-   * @return
    */
-  private boolean isJobComplete(TaskInProgress tip, JobTrackerInstrumentation metrics) {
-    // Job is complete if total-tips = finished-tips + failed-tips
-    boolean allDone = 
-      ((finishedMapTasks + failedMapTIPs) == numMapTasks);
-    if (allDone) {
-      if (tip.isMapTask()) {
-        this.status.setMapProgress(1.0f);              
-      }
-      allDone = 
-        ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
-    }
-
+  private void jobComplete(JobTrackerInstrumentation metrics) {
     //
-    // If all tasks are complete, then the job is done!
+    // All tasks are complete, then the job is done!
     //
-    if (this.status.getRunState() == JobStatus.RUNNING && allDone) {
+    if (this.status.getRunState() == JobStatus.RUNNING ) {
       this.status.setRunState(JobStatus.SUCCEEDED);
-      this.status.setReduceProgress(1.0f);
+      this.status.setCleanupProgress(1.0f);
       this.finishTime = System.currentTimeMillis();
       garbageCollect();
       LOG.info("Job " + this.status.getJobID() + 
@@ -1472,11 +1579,21 @@ class JobInProgress {
                                      this.finishedReduceTasks, failedMapTasks, 
                                      failedReduceTasks, getCounters());
       metrics.completeJob();
-      return true;
     }
-    
-    return false;
   }
+  
+  private synchronized void killJob() {
+    if ((status.getRunState() == JobStatus.RUNNING) ||
+        (status.getRunState() == JobStatus.PREP)) {
+      this.status = new JobStatus(status.getJobID(),
+                          1.0f, 1.0f, 1.0f, JobStatus.FAILED);
+      this.finishTime = System.currentTimeMillis();
+      JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
+              this.finishedMapTasks, this.finishedReduceTasks);
+      garbageCollect();
+    }
+  }
+
   /**
    * Kill the job and all its component tasks.
    */
@@ -1484,8 +1601,6 @@ class JobInProgress {
     if ((status.getRunState() == JobStatus.RUNNING) ||
          (status.getRunState() == JobStatus.PREP)) {
       LOG.info("Killing job '" + this.status.getJobID() + "'");
-      this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, JobStatus.FAILED);
-      this.finishTime = System.currentTimeMillis();
       this.runningMapTasks = 0;
       this.runningReduceTasks = 0;
       //
@@ -1497,9 +1612,7 @@ class JobInProgress {
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
-      JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
-                                   this.finishedMapTasks, this.finishedReduceTasks);
-      garbageCollect();
+      jobKilled = true;
     }
   }
 
@@ -1531,7 +1644,9 @@ class JobInProgress {
         
     //update running  count on task failure.
     if (wasRunning && !isRunning) {
-      if (tip.isMapTask()){
+      if (tip.isCleanupTask()) {
+        launchedCleanup = false;
+      } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
@@ -1592,10 +1707,12 @@ class JobInProgress {
         
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
-    if (tip.isMapTask()) {
-      failedMapTasks++; 
-    } else {
-      failedReduceTasks++; 
+    if (!tip.isCleanupTask()) {
+      if (tip.isMapTask()) {
+        failedMapTasks++;
+      } else {
+        failedReduceTasks++; 
+      }
     }
             
     //
@@ -1620,8 +1737,8 @@ class JobInProgress {
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
       //
-      boolean killJob = 
-        tip.isMapTask() ? 
+      boolean killJob = tip.isCleanupTask() ? true :
+                        tip.isMapTask() ? 
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
       
@@ -1638,18 +1755,28 @@ class JobInProgress {
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks
                                     );
-        kill();
-      } else {
-        isJobComplete(tip, metrics);
+        if (tip.isCleanupTask()) {
+          // kill the other tip
+          if (tip.isMapTask()) {
+            cleanup[1].kill();
+          } else {
+            cleanup[0].kill();
+          }
+          killJob();
+        } else {
+          kill();
+        }
       }
       
       //
       // Update the counters
       //
-      if (tip.isMapTask()) {
-        jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
-      } else {
-        jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
+      if (!tip.isCleanupTask()) {
+        if (tip.isMapTask()) {
+          jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
+        } else {
+          jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
+        }
       }
     }
   }
@@ -1711,16 +1838,6 @@ class JobInProgress {
       Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
       FileSystem fs = tempDir.getFileSystem(conf);
       fs.delete(tempDir, true); 
-
-      // delete the temporary directory in output directory
-      Path outputPath = FileOutputFormat.getOutputPath(conf);
-      if (outputPath != null) {
-        Path tmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-        FileSystem fileSys = tmpDir.getFileSystem(conf);
-        if (fileSys.exists(tmpDir)) {
-          fileSys.delete(tmpDir, true);
-        }
-      }
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
@@ -1736,8 +1853,13 @@ class JobInProgress {
   /**
    * Return the TaskInProgress that matches the tipid.
    */
-  public TaskInProgress getTaskInProgress(TaskID tipid){
+  public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
     if (tipid.isMap()) {
+      for (int i = 0; i < cleanup.length; i++) {
+        if (tipid.equals(cleanup[i].getTIPId())){
+          return cleanup[i];
+        }
+      }
       for (int i = 0; i < maps.length; i++) {
         if (tipid.equals(maps[i].getTIPId())){
           return maps[i];
@@ -1758,7 +1880,7 @@ class JobInProgress {
    * @param mapId the id of the map
    * @return the task status of the completed task
    */
-  public TaskStatus findFinishedMap(int mapId) {
+  public synchronized TaskStatus findFinishedMap(int mapId) {
     TaskInProgress tip = maps[mapId];
     if (tip.isComplete()) {
       TaskStatus[] statuses = tip.getTaskStatuses();

+ 53 - 14
src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java

@@ -73,20 +73,68 @@ class JobQueueTaskScheduler extends TaskScheduler {
                                  0.01f);
   }
 
+  protected Task getCleanupTask(int numMaps, int numReduces,
+                                int maxMapTasks, int maxReduceTasks,
+                                TaskTrackerStatus taskTracker,
+                                int numTaskTrackers,
+                                Collection<JobInProgress> jobQueue) 
+  throws IOException {
+    Task t = null;
+    if (numMaps < maxMapTasks) {
+      for (JobInProgress job : jobQueue) {
+        t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+                       taskTrackerManager.getNumberOfUniqueHosts(), true);
+        if (t != null) {
+          return t;
+        }
+      }
+    }
+    if (numReduces < maxReduceTasks) {
+      for (JobInProgress job : jobQueue) {
+        t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+                       taskTrackerManager.getNumberOfUniqueHosts(), false);
+        if (t != null) {
+          return t;
+        }
+      }
+    }
+    return t;
+  }
+  
   @Override
   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
       throws IOException {
-    //
-    // Compute average map and reduce task numbers across pool
-    //
-    int remainingReduceLoad = 0;
-    int remainingMapLoad = 0;
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     int numTaskTrackers = clusterStatus.getTaskTrackers();
 
     Collection<JobInProgress> jobQueue =
       jobQueueJobInProgressListener.getJobQueue();
+
+    //
+    // Get map + reduce counts for the current tracker.
+    //
+    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
+    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
+    int numMaps = taskTracker.countMapTasks();
+    int numReduces = taskTracker.countReduceTasks();
+
+
+    // cleanup task has the highest priority, it should be 
+    // launched as soon as the job is done.
+    synchronized (jobQueue) {
+      Task t = getCleanupTask(numMaps, numReduces, maxCurrentMapTasks,
+                 maxCurrentReduceTasks, taskTracker, numTaskTrackers, jobQueue);
+      if (t != null) {
+        return Collections.singletonList(t);
+      }
+    }
+
+    //
+    // Compute average map and reduce task numbers across pool
+    //
+    int remainingReduceLoad = 0;
+    int remainingMapLoad = 0;
     synchronized (jobQueue) {
       for (JobInProgress job : jobQueue) {
         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
@@ -98,8 +146,6 @@ class JobQueueTaskScheduler extends TaskScheduler {
       }
     }
 
-    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
-    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
     // find out the maximum number of maps or reduces that we are willing
     // to run on any node.
     int maxMapLoad = 0;
@@ -113,13 +159,6 @@ class JobQueueTaskScheduler extends TaskScheduler {
                                                / numTaskTrackers));
     }
         
-    //
-    // Get map + reduce counts for the current tracker.
-    //
-
-    int numMaps = taskTracker.countMapTasks();
-    int numReduces = taskTracker.countReduceTasks();
-    
     int totalMaps = clusterStatus.getMapTasks();
     int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
     int totalReduces = clusterStatus.getReduceTasks();

+ 32 - 1
src/mapred/org/apache/hadoop/mapred/JobStatus.java

@@ -49,6 +49,7 @@ public class JobStatus implements Writable {
   private JobID jobid;
   private float mapProgress;
   private float reduceProgress;
+  private float cleanupProgress;
   private int runState;
   private long startTime;
   private String user;
@@ -63,16 +64,31 @@ public class JobStatus implements Writable {
    * @param jobid The jobid of the job
    * @param mapProgress The progress made on the maps
    * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on cleanup
    * @param runState The current state of the job
    */
-  public JobStatus(JobID jobid, float mapProgress, float reduceProgress, int runState) {
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+                   float cleanupProgress, int runState) {
     this.jobid = jobid;
     this.mapProgress = mapProgress;
     this.reduceProgress = reduceProgress;
+    this.cleanupProgress = cleanupProgress;
     this.runState = runState;
     this.user = "nobody";
   }
 
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   */
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+                   int runState) {
+    this(jobid, mapProgress, reduceProgress, 0.0f, runState);
+  }
+
   /**
    * @deprecated use getJobID instead
    */
@@ -96,7 +112,20 @@ public class JobStatus implements Writable {
   synchronized void setMapProgress(float p) { 
     this.mapProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
   }
+
+  /**
+   * @return Percentage of progress in cleanup 
+   */
+  public synchronized float cleanupProgress() { return cleanupProgress; }
     
+  /**
+   * Sets the cleanup progress of this job
+   * @param p The value of cleanup progress to set to
+   */
+  synchronized void setCleanupProgress(float p) { 
+    this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+
   /**
    * @return Percentage of progress in reduce 
    */
@@ -150,6 +179,7 @@ public class JobStatus implements Writable {
     jobid.write(out);
     out.writeFloat(mapProgress);
     out.writeFloat(reduceProgress);
+    out.writeFloat(cleanupProgress);
     out.writeInt(runState);
     out.writeLong(startTime);
     Text.writeString(out, user);
@@ -159,6 +189,7 @@ public class JobStatus implements Writable {
     this.jobid = JobID.read(in);
     this.mapProgress = in.readFloat();
     this.reduceProgress = in.readFloat();
+    this.cleanupProgress = in.readFloat();
     this.runState = in.readInt();
     this.startTime = in.readLong();
     this.user = Text.readString(in);

+ 8 - 1
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -41,8 +41,10 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    * Version 9: change the counter representation for HADOOP-1915
    * Version 10: added getSystemDir for HADOOP-3135
    * Version 11: changed JobProfile to include the queue name for HADOOP-3698
+   * Version 12: Added getCleanupTaskReports and 
+   *             cleanupProgress to JobStatus as part of HADOOP-3150
    */
-  public static final long versionID = 11L;
+  public static final long versionID = 12L;
 
   /**
    * Allocate a name for the job.
@@ -104,6 +106,11 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    */
   public TaskReport[] getReduceTaskReports(JobID jobid) throws IOException;
 
+  /**
+   * Grab a bunch of info on the cleanup tasks that make up the job
+   */
+  public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException;
+
   /**
    * A MapReduce system always operates on a single filesystem.  This 
    * function returns the fs name.  ('local' if the localfs; 'addr:port' 

+ 54 - 194
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -41,7 +41,6 @@ import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -62,7 +61,6 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlIOException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -512,8 +510,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   Path systemDir = null;
   private JobConf conf;
 
-  private Thread taskCommitThread;
-  
   private QueueManager queueManager;
 
   /**
@@ -693,8 +689,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     this.retireJobsThread.start();
     taskScheduler.start();
     expireLaunchingTaskThread.start();
-    this.taskCommitThread = new TaskCommitQueue();
-    this.taskCommitThread.start();
 
     if (completedJobStatusStore.isActive()) {
       completedJobsStoreThread = new Thread(completedJobStatusStore,
@@ -749,15 +743,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         ex.printStackTrace();
       }
     }
-    if (this.taskCommitThread != null) {
-      LOG.info("Stopping TaskCommit thread");
-      this.taskCommitThread.interrupt();
-      try {
-        this.taskCommitThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
     if (this.completedJobsStoreThread != null &&
         this.completedJobsStoreThread.isAlive()) {
       LOG.info("Stopping completedJobsStore thread");
@@ -1258,6 +1243,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       actions.addAll(killTasksList);
     }
      
+    // Check for tasks whose outputs can be saved
+    List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
+    if (commitTasksList != null) {
+      actions.addAll(commitTasksList);
+    }
+
     // calculate next heartbeat interval and put in heartbeat response
     int nextInterval = getNextHeartbeatInterval();
     response.setHeartbeatInterval(nextInterval);
@@ -1442,6 +1433,30 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return null;
   }
 
+  /**
+   * A tracker wants to know if any of its Tasks can be committed 
+   */
+  private synchronized List<TaskTrackerAction> getTasksToSave(
+                                                 TaskTrackerStatus tts) {
+    List<TaskStatus> taskStatuses = tts.getTaskReports();
+    if (taskStatuses != null) {
+      List<TaskTrackerAction> saveList = new ArrayList<TaskTrackerAction>();
+      for (TaskStatus taskStatus : taskStatuses) {
+        if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          TaskAttemptID taskId = taskStatus.getTaskID();
+          TaskInProgress tip = taskidToTIPMap.get(taskId);
+          if (tip.shouldCommit(taskId)) {
+            saveList.add(new CommitTaskAction(taskId));
+            LOG.debug(tts.getTrackerName() + 
+                      " -> CommitTaskAction: " + taskId);
+          }
+        }
+      }
+      return saveList;
+    }
+    return null;
+  }
+  
   /**
    * Grab the local fs name
    */
@@ -1622,7 +1637,30 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       return reports.toArray(new TaskReport[reports.size()]);
     }
   }
-    
+
+  public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
+    JobInProgress job = jobs.get(jobid);
+    if (job == null) {
+      return new TaskReport[0];
+    } else {
+      Vector<TaskReport> reports = new Vector<TaskReport>();
+      Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
+      for (Iterator<TaskInProgress> it = completeTasks.iterator();
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      Vector<TaskInProgress> incompleteTasks = job.reportCleanupTIPs(false);
+      for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); 
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      return reports.toArray(new TaskReport[reports.size()]);
+    }
+  
+  }
+  
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
   
   /* 
@@ -1886,184 +1924,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       removeMarkedTasks(trackerName);
     }
   }
-
-  /**
-   * Add a job's completed task (either successful or failed/killed) to the 
-   * {@link TaskCommitQueue}. 
-   * @param j completed task (either successful or failed/killed)
-   */
-  void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
-    ((TaskCommitQueue)taskCommitThread).addToQueue(j);
-  }
-  
-  /**
-   * A thread which does all of the {@link FileSystem}-related operations for
-   * tasks. It picks the next task in the queue, promotes outputs of 
-   * {@link TaskStatus.State#SUCCEEDED} tasks & discards outputs for 
-   * {@link TaskStatus.State#FAILED} or {@link TaskStatus.State#KILLED} tasks.
-   */
-  private class TaskCommitQueue extends Thread {
-    
-    private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue = 
-            new LinkedBlockingQueue <JobInProgress.JobWithTaskContext>();
-        
-    public TaskCommitQueue() {
-      setName("Task Commit Thread");
-      setDaemon(true);
-    }
-    
-    public void addToQueue(JobInProgress.JobWithTaskContext j) {
-      while (true) { // loop until the element gets added
-        try {
-          queue.put(j);
-          return;
-        } catch (InterruptedException ie) {}
-      }
-    }
-       
-    @Override
-    public void run() {
-      int  batchCommitSize = conf.getInt("jobtracker.task.commit.batch.size", 
-                                         5000); 
-      while (!isInterrupted()) {
-        try {
-          ArrayList <JobInProgress.JobWithTaskContext> jobList = 
-            new ArrayList<JobInProgress.JobWithTaskContext>(batchCommitSize);
-          // Block if the queue is empty
-          jobList.add(queue.take());  
-          queue.drainTo(jobList, batchCommitSize);
-
-          JobInProgress[] jobs = new JobInProgress[jobList.size()];
-          TaskInProgress[] tips = new TaskInProgress[jobList.size()];
-          TaskAttemptID[] taskids = new TaskAttemptID[jobList.size()];
-          JobTrackerInstrumentation[] metrics = new JobTrackerInstrumentation[jobList.size()];
-
-          Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
-          int count = 0;
-
-          while (iter.hasNext()) {
-            JobInProgress.JobWithTaskContext j = iter.next();
-            jobs[count] = j.getJob();
-            tips[count] = j.getTIP();
-            taskids[count]= j.getTaskID();
-            metrics[count] = j.getJobTrackerMetrics();
-            ++count;
-          }
-
-          Task[] tasks = new Task[jobList.size()];
-          TaskStatus[] status = new TaskStatus[jobList.size()];
-          boolean[] isTipComplete = new boolean[jobList.size()];
-          TaskStatus.State[] states = new TaskStatus.State[jobList.size()];
-
-          synchronized (JobTracker.this) {
-            for(int i = 0; i < jobList.size(); ++i) {
-              synchronized (jobs[i]) {
-                synchronized (tips[i]) {
-                  status[i] = tips[i].getTaskStatus(taskids[i]);
-                  tasks[i] = tips[i].getTask(taskids[i]);
-                  states[i] = status[i].getRunState();
-                  isTipComplete[i] = tips[i].isComplete();
-                }
-              }
-            }
-          }
-
-          //For COMMIT_PENDING tasks, we save the task output in the dfs
-          //as well as manipulate the JT datastructures to reflect a
-          //successful task. This guarantees that we don't declare a task
-          //as having succeeded until we have successfully completed the
-          //dfs operations.
-          //For failed tasks, we just do the dfs operations here. The
-          //datastructures updates is done earlier as soon as the failure
-          //is detected so that the JT can immediately schedule another
-          //attempt for that task.
-
-          Set<TaskID> seenTIPs = new HashSet<TaskID>();
-          for(int index = 0; index < jobList.size(); ++index) {
-            try {
-              if (states[index] == TaskStatus.State.COMMIT_PENDING) {
-                if (!isTipComplete[index]) {
-                  if (!seenTIPs.contains(tips[index].getTIPId())) {
-                    tasks[index].saveTaskOutput();
-                    seenTIPs.add(tips[index].getTIPId());
-                  } else {
-                    // since other task of this tip has saved its output
-                    isTipComplete[index] = true;
-                  }
-                }
-              }
-            } catch (IOException ioe) {
-              // Oops! Failed to copy the task's output to its final place;
-              // fail the task!
-              states[index] = TaskStatus.State.FAILED;
-              synchronized (JobTracker.this) {
-                String reason = "Failed to rename output with the exception: " 
-                                + StringUtils.stringifyException(ioe);
-                TaskStatus.Phase phase = (tips[index].isMapTask() 
-                                          ? TaskStatus.Phase.MAP 
-                                          : TaskStatus.Phase.REDUCE);
-                jobs[index].failedTask(tips[index], status[index].getTaskID(), 
-                                       reason, phase, TaskStatus.State.FAILED, 
-                                       status[index].getTaskTracker(), null);
-              }
-              LOG.info("Failed to rename the output of " 
-                       + status[index].getTaskID() + " with " 
-                       + StringUtils.stringifyException(ioe));
-            }
-          }
-
-          synchronized (JobTracker.this) {
-            //do a check for the case where after the task went to
-            //COMMIT_PENDING, it was lost. So although we would have
-            //saved the task output, we cannot declare it a SUCCESS.
-            for(int i = 0; i < jobList.size(); ++i) {
-              TaskStatus newStatus = null;
-              if(states[i] == TaskStatus.State.COMMIT_PENDING) {
-                synchronized (jobs[i]) {
-                  synchronized (tips[i]) {
-                    status[i] = tips[i].getTaskStatus(taskids[i]);
-                    if (!isTipComplete[i]) {
-                      if (status[i].getRunState() 
-                          != TaskStatus.State.COMMIT_PENDING) {
-                        states[i] = TaskStatus.State.KILLED;
-                      } else {
-                        states[i] = TaskStatus.State.SUCCEEDED;
-                      }
-                    } else {
-                      tips[i].addDiagnosticInfo(tasks[i].getTaskID(), 
-                                                "Already completed  TIP");
-                      states[i] = TaskStatus.State.KILLED;
-                    }
-                    //create new status if required. If the state changed 
-                    //from COMMIT_PENDING to KILLED in the JobTracker, while 
-                    //we were saving the output,the JT would have called 
-                    //updateTaskStatus and we don't need to call it again
-                    newStatus = (TaskStatus)status[i].clone();
-                    newStatus.setRunState(states[i]);
-                    newStatus.setProgress(
-                        (states[i] == TaskStatus.State.SUCCEEDED) 
-                        ? 1.0f 
-                        : 0.0f);
-                  }
-                  if (newStatus != null) {
-                    jobs[i].updateTaskStatus(tips[i], newStatus, metrics[i]);
-                  }
-                }
-              }
-            }
-          }
-        } catch (InterruptedException ie) {
-          break;
-        }
-        catch (Throwable t) {
-          LOG.error(getName() + " got an exception: " +
-                    StringUtils.stringifyException(t));
-        }
-      }
-      
-      LOG.warn(getName() + " exiting..."); 
-    }
-  }
   
 
   /**

+ 17 - 5
src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java

@@ -62,17 +62,32 @@ class LimitTasksPerJobTaskScheduler extends JobQueueTaskScheduler {
   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
       throws IOException {
 
+    final int numTaskTrackers =
+        taskTrackerManager.getClusterStatus().getTaskTrackers();
+    Collection<JobInProgress> jobQueue =
+      jobQueueJobInProgressListener.getJobQueue();
+    Task task;
+
     /* Stats about the current taskTracker */
     final int mapTasksNumber = taskTracker.countMapTasks();
     final int reduceTasksNumber = taskTracker.countReduceTasks();
     final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
     final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
+
+    // check if cleanup task can be launched
+    synchronized (jobQueue) {
+      task = getCleanupTask(mapTasksNumber, reduceTasksNumber,
+               maximumMapTasksNumber, maximumReduceTasksNumber,
+               taskTracker, numTaskTrackers, jobQueue);
+      if (task != null) {
+        return Collections.singletonList(task);
+      }
+    }
+
     /*
      * Statistics about the whole cluster. Most are approximate because of
      * concurrency
      */
-    final int numTaskTrackers =
-      taskTrackerManager.getClusterStatus().getTaskTrackers();
     final int[] maxMapAndReduceLoad = getMaxMapAndReduceLoad(
         maximumMapTasksNumber, maximumReduceTasksNumber);
     final int maximumMapLoad = maxMapAndReduceLoad[0];
@@ -112,11 +127,8 @@ class LimitTasksPerJobTaskScheduler extends JobQueueTaskScheduler {
         continue;
       }
       /* For each job, start its tasks */
-      Collection<JobInProgress> jobQueue =
-        jobQueueJobInProgressListener.getJobQueue();
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
-          Task task;
           /* Ignore non running jobs */
           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
             continue;

+ 25 - 49
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -111,18 +111,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
           numReduceTasks = 1;
           job.setNumReduceTasks(1);
         }
-        // create job specific temp directory in output path
-        Path outputPath = FileOutputFormat.getOutputPath(job);
-        FileSystem outputFs = null;
-        Path tmpDir = null;
-        if (outputPath != null) {
-          tmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-          outputFs = tmpDir.getFileSystem(job);
-          if (!outputFs.mkdirs(tmpDir)) {
-            LOG.error("Mkdirs failed to create " + tmpDir.toString());
-          }
-        }
-
+        JobContext jContext = new JobContext(conf);
+        OutputCommitter outputCommitter = job.getOutputCommitter();
+        
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
           TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i), 0);  
@@ -136,25 +127,12 @@ class LocalJobRunner implements JobSubmissionProtocol {
                                     splits[i].getClass().getName(),
                                     split);
           JobConf localConf = new JobConf(job);
-          if (outputFs != null) {
-            if (outputFs.exists(tmpDir)) {
-              Path taskTmpDir = new Path(tmpDir, "_" + mapId);
-              if (!outputFs.mkdirs(taskTmpDir)) {
-                throw new IOException("Mkdirs failed to create " 
-                                       + taskTmpDir.toString());
-              }
-            } else {
-              throw new IOException("The directory " + tmpDir.toString()
-                                   + " doesnt exist " );
-            }
-          }
           map.setJobFile(localFile.toString());
           map.localizeConfiguration(localConf);
           map.setConf(localConf);
           map_tasks += 1;
           myMetrics.launchMap(mapId);
           map.run(localConf, this);
-          map.saveTaskOutput();
           myMetrics.completeMap(mapId);
           map_tasks -= 1;
           updateCounters(map);
@@ -180,25 +158,12 @@ class LocalJobRunner implements JobSubmissionProtocol {
               ReduceTask reduce = new ReduceTask(file.toString(), 
                                                  reduceId, 0, mapIds.size());
               JobConf localConf = new JobConf(job);
-              if (outputFs != null) {
-                if (outputFs.exists(tmpDir)) {
-                  Path taskTmpDir = new Path(tmpDir, "_" + reduceId);
-                  if (!outputFs.mkdirs(taskTmpDir)) {
-                    throw new IOException("Mkdirs failed to create " 
-                                           + taskTmpDir.toString());
-                  }
-                } else {
-                  throw new IOException("The directory " + tmpDir.toString()
-                                       + " doesnt exist ");
-                }
-              }
               reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
               myMetrics.launchReduce(reduce.getTaskID());
               reduce.run(localConf, this);
-              reduce.saveTaskOutput();
               myMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
               updateCounters(reduce);
@@ -213,15 +178,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
           }
         }
         // delete the temporary directory in output directory
-        try {
-          if (outputFs != null) {
-            if (outputFs.exists(tmpDir)) {
-              outputFs.delete(tmpDir, true);
-            }
-          }
-        } catch (IOException e) {
-          LOG.error("Exception in deleting " + tmpDir.toString());
-        }
+        outputCommitter.cleanupJob(jContext);
+        status.setCleanupProgress(1.0f);
 
         this.status.setRunState(JobStatus.SUCCEEDED);
 
@@ -264,6 +222,16 @@ class LocalJobRunner implements JobSubmissionProtocol {
       return true;
     }
 
+    /**
+     * Task is reporting that it is in commit_pending
+     * and it is waiting for the commit Response
+     */
+    public void commitPending(TaskAttemptID taskid,
+                              TaskStatus taskStatus) 
+    throws IOException, InterruptedException {
+      statusUpdate(taskid, taskStatus);
+    }
+
     /**
      * Updates counters corresponding to completed tasks.
      * @param task A map or reduce task which has just been 
@@ -285,8 +253,13 @@ class LocalJobRunner implements JobSubmissionProtocol {
     public boolean ping(TaskAttemptID taskid) throws IOException {
       return true;
     }
-
-    public void done(TaskAttemptID taskId, boolean shouldPromote) throws IOException {
+    
+    public boolean canCommit(TaskAttemptID taskid) 
+    throws IOException {
+      return true;
+    }
+    
+    public void done(TaskAttemptID taskId) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
         status.setMapProgress(1.0f);
@@ -352,6 +325,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
   public TaskReport[] getReduceTaskReports(JobID id) {
     return new TaskReport[0];
   }
+  public TaskReport[] getCleanupTaskReports(JobID id) {
+    return new TaskReport[0];
+  }
 
   public JobStatus getJobStatus(JobID id) {
     Job job = jobs.get(id);

+ 0 - 5
src/mapred/org/apache/hadoop/mapred/MRConstants.java

@@ -61,10 +61,5 @@ interface MRConstants {
    */
   public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
 
-  /**
-   * Temporary directory name 
-   */
-  public static final String TEMP_DIR_NAME = "_temporary";
-  
   public static final String WORKDIR = "work";
 }

+ 7 - 1
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -31,7 +31,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -266,6 +265,13 @@ class MapTask extends Task {
     // start thread that will handle communication with parent
     startCommunicationThread(umbilical);
 
+    initialize(job, reporter);
+    // check if it is a cleanupJobTask
+    if (cleanupJob) {
+      runCleanup(umbilical);
+      return;
+    }
+
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
     MapOutputCollector collector = null;

+ 117 - 0
src/mapred/org/apache/hadoop/mapred/OutputCommitter.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * <code>OutputCommitter</code> describes the commit of task output for a 
+ * Map-Reduce job.
+ *
+ * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
+ * the job to:<p>
+ * <ol>
+ *   <li>
+ *   Setup the job during initialization. For example, create the temporary 
+ *   output directory for the job during the initialization of the job.
+ *   The job client does the setup for the job.
+ *   </li>
+ *   <li>
+ *   Cleanup the job after the job completion. For example, remove the
+ *   temporary output directory after the job completion. CleanupJob is done
+ *   by a separate task at the end of the job.
+ *   </li>
+ *   <li>
+ *   Setup the task temporary output.
+ *   </li> 
+ *   <li>
+ *   Check whether a task needs a commit. This is to avoid the commit
+ *   procedure if a task does not need commit.
+ *   </li>
+ *   <li>
+ *   Commit of the task output.
+ *   </li>  
+ *   <li>
+ *   Discard the task commit.
+ *   </li>
+ * </ol>
+ * 
+ * @see FileOutputCommitter 
+ * @see JobContext
+ * @see TaskAttemptContext 
+ *
+ */
+public abstract class OutputCommitter {
+  /**
+   * For the framework to setup the job output during initialization
+   * 
+   * The job client does the setup for the job.
+   *   
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException if temporary output could not be created
+   */
+  public abstract void setupJob(JobContext jobContext) throws IOException;
+
+  /**
+   * For cleaning up the job's output after job completion
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException
+   */
+  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+
+  /**
+   * Sets up output for the task.
+   * 
+   * @param taskContext Context of the task whose output is being written.
+   * @throws IOException
+   */
+  public abstract void setupTask(TaskAttemptContext taskContext)
+  throws IOException;
+  
+  /**
+   * Check whether task needs a commit
+   * 
+   * @param taskContext
+   * @return true/false
+   * @throws IOException
+   */
+  public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
+  throws IOException;
+
+  /**
+   * To promote the task's temporary output to final output location
+   * 
+   * The task's output is moved to the job's output directory.
+   * 
+   * @param taskContext Context of the task whose output is being written.
+   * @throws IOException if commit is not 
+   */
+  public abstract void commitTask(TaskAttemptContext taskContext)
+  throws IOException;
+  
+  /**
+   * Discard the task output
+   * 
+   * @param taskContext
+   * @throws IOException
+   */
+  public abstract void abortTask(TaskAttemptContext taskContext)
+  throws IOException;
+}

+ 8 - 2
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -69,7 +69,6 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.IFile.*;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
-import org.apache.hadoop.mapred.Task.Counter;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -326,7 +325,15 @@ class ReduceTask extends Task {
 
     // start thread that will handle communication with parent
     startCommunicationThread(umbilical);
+    final Reporter reporter = getReporter(umbilical);
+    initialize(job, reporter);
 
+    // check if it is a cleanupJobTask
+    if (cleanupJob) {
+      runCleanup(umbilical);
+      return;
+    }
+    
     FileSystem lfs = FileSystem.getLocal(job);
     
     // Initialize the codec
@@ -350,7 +357,6 @@ class ReduceTask extends Task {
  
     setPhase(TaskStatus.Phase.SORT); 
 
-    final Reporter reporter = getReporter(umbilical);
     
     // sort the input file
     LOG.info("Initiating final on-disk merge with " + mapFiles.length + 

+ 9 - 0
src/mapred/org/apache/hadoop/mapred/RunningJob.java

@@ -84,6 +84,15 @@ public interface RunningJob {
    */
   public float reduceProgress() throws IOException;
 
+  /**
+   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
+   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's cleanup-tasks.
+   * @throws IOException
+   */
+  public float cleanupProgress() throws IOException;
+
   /**
    * Check if the job is finished or not. 
    * This is a non-blocking call.

+ 171 - 142
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.mapred;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.net.URI;
 import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -34,8 +33,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -48,7 +45,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -110,8 +106,8 @@ abstract class Task implements Writable, Configurable {
   private String jobFile;                         // job configuration file
   private TaskAttemptID taskId;                          // unique, includes job id
   private int partition;                          // id within job
-  TaskStatus taskStatus; 										      // current status of the task
-  private Path taskOutputPath;                    // task-specific output dir
+  TaskStatus taskStatus;                          // current status of the task
+  protected boolean cleanupJob = false;
   
   //failed ranges from previous attempts
   private SortedRanges failedRanges = new SortedRanges();
@@ -125,6 +121,10 @@ abstract class Task implements Writable, Configurable {
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
+  private final static int MAX_RETRIES = 10;
+  protected JobContext jobContext;
+  protected TaskAttemptContext taskContext;
+  private volatile boolean commitPending = false;
 
   ////////////////////////////////////////////
   // Constructors
@@ -220,6 +220,13 @@ abstract class Task implements Writable, Configurable {
     this.skipping = skipping;
   }
 
+  /**
+   * Sets whether the task is cleanup task
+   */
+  public void setCleanupTask() {
+    cleanupJob = true;
+  }
+
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -228,48 +235,27 @@ abstract class Task implements Writable, Configurable {
     Text.writeString(out, jobFile);
     taskId.write(out);
     out.writeInt(partition);
-    if (taskOutputPath != null) {
-      Text.writeString(out, taskOutputPath.toString());
-    } else {
-      Text.writeString(out, "");
-    }
     taskStatus.write(out);
     failedRanges.write(out);
     out.writeBoolean(skipping);
+    out.writeBoolean(cleanupJob);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
     partition = in.readInt();
-    String outPath = Text.readString(in);
-    if (outPath.length() != 0) {
-      taskOutputPath = new Path(outPath);
-    } else {
-      taskOutputPath = null;
-    }
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
     failedRanges.readFields(in);
     currentRecIndexIterator = failedRanges.skipRangeIterator();
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
+    cleanupJob = in.readBoolean();
   }
 
   @Override
   public String toString() { return taskId.toString(); }
 
-  private Path getTaskOutputPath(JobConf conf) {
-    Path p = new Path(FileOutputFormat.getOutputPath(conf), 
-      (MRConstants.TEMP_DIR_NAME + Path.SEPARATOR + "_" + taskId));
-    try {
-      FileSystem fs = p.getFileSystem(conf);
-      return p.makeQualified(fs);
-    } catch (IOException ie) {
-      LOG.warn(StringUtils.stringifyException(ie));
-      return p;
-    }
-  }
-  
   /**
    * Localize the given JobConf to be specific for this task.
    */
@@ -279,12 +265,18 @@ abstract class Task implements Writable, Configurable {
     conf.setBoolean("mapred.task.is.map", isMapTask());
     conf.setInt("mapred.task.partition", partition);
     conf.set("mapred.job.id", taskId.getJobID().toString());
-    
-    // The task-specific output path
-    if (FileOutputFormat.getOutputPath(conf) != null) {
-      taskOutputPath = getTaskOutputPath(conf);
-      FileOutputFormat.setWorkOutputPath(conf, taskOutputPath);
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      OutputCommitter committer = conf.getOutputCommitter();
+      if ((committer instanceof FileOutputCommitter)) {
+        TaskAttemptContext context = new TaskAttemptContext(conf, taskId);
+        FileOutputFormat.setWorkOutputPath(conf, 
+          ((FileOutputCommitter)committer).getTempTaskOutputPath(context));
+      } else {
+        FileOutputFormat.setWorkOutputPath(conf, outputPath);
+      }
     }
+
   }
   
   /** Run this task as a part of the named job.  This method is executed in the
@@ -359,8 +351,17 @@ abstract class Task implements Writable, Configurable {
               if (sendProgress) {
                 // we need to send progress update
                 updateCounters();
-                taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), 
-                        counters);
+                if (commitPending) {
+                  taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
+                                          taskProgress.get(),
+                                          taskProgress.toString(), 
+                                          counters);
+                } else {
+                  taskStatus.statusUpdate(TaskStatus.State.RUNNING,
+                                          taskProgress.get(),
+                                          taskProgress.toString(), 
+                                          counters);
+                }
                 taskFound = umbilical.statusUpdate(taskId, taskStatus);
                 taskStatus.clearStatus();
               }
@@ -396,6 +397,13 @@ abstract class Task implements Writable, Configurable {
     LOG.debug(getTaskID() + " Progress/ping thread started");
   }
 
+  public void initialize(JobConf job, Reporter reporter) 
+  throws IOException {
+    jobContext = new JobContext(job, reporter);
+    taskContext = new TaskAttemptContext(job, taskId, reporter);
+    OutputCommitter committer = conf.getOutputCommitter();
+    committer.setupTask(taskContext);
+  }
   
   protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) 
     throws IOException 
@@ -543,54 +551,86 @@ abstract class Task implements Writable, Configurable {
   }
 
   public void done(TaskUmbilicalProtocol umbilical) throws IOException {
-    int retries = 10;
-    boolean needProgress = true;
+    LOG.info("Task:" + taskId + " is done."
+             + " And is in the process of commiting");
     updateCounters();
+
+    OutputCommitter outputCommitter = conf.getOutputCommitter();
+    // check whether the commit is required.
+    boolean commitRequired = outputCommitter.needsTaskCommit(taskContext);
+    if (commitRequired) {
+      int retries = MAX_RETRIES;
+      taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
+      commitPending = true;
+      // say the task tracker that task is commit pending
+      while (true) {
+        try {
+          umbilical.commitPending(taskId, taskStatus);
+          break;
+        } catch (InterruptedException ie) {
+          // ignore
+        } catch (IOException ie) {
+          LOG.warn("Failure sending commit pending: " + 
+                    StringUtils.stringifyException(ie));
+          if (--retries == 0) {
+            System.exit(67);
+          }
+        }
+      }
+      //wait for commit approval and commit
+      commit(umbilical, outputCommitter);
+    }
     taskDone.set(true);
+    sendLastUpdate(umbilical);
+    //signal the tasktracker that we are done
+    sendDone(umbilical);
+  }
+
+  private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    //first wait for the COMMIT approval from the tasktracker
+    int retries = MAX_RETRIES;
     while (true) {
       try {
-        if (needProgress) {
-          // send a final status report
-          taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), 
+        // send a final status report
+        if (commitPending) {
+          taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
+                                  taskProgress.get(),
+                                  taskProgress.toString(), 
+                                  counters);
+        } else {
+          taskStatus.statusUpdate(TaskStatus.State.RUNNING,
+                                  taskProgress.get(),
+                                  taskProgress.toString(), 
                                   counters);
-          try {
-            if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
-              LOG.warn("Parent died.  Exiting "+taskId);
-              System.exit(66);
-            }
-            taskStatus.clearStatus();
-            needProgress = false;
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();       // interrupt ourself
-          }
         }
-        // Check whether there is any task output
-        boolean shouldBePromoted = false;
+
         try {
-          if (taskOutputPath != null) {
-            // Get the file-system for the task output directory
-            FileSystem fs = taskOutputPath.getFileSystem(conf);
-            if (fs.exists(taskOutputPath)) {
-              // Get the summary for the folder
-              ContentSummary summary = fs.getContentSummary(taskOutputPath);
-              // Check if the directory contains data to be promoted
-              // i.e total-files + total-folders - 1(itself)
-              if (summary != null 
-                  && (summary.getFileCount() + summary.getDirectoryCount() - 1)
-                      > 0) {
-                shouldBePromoted = true;
-              }
-            } else {
-              LOG.info(getTaskID() + ": No outputs to promote from " + 
-                       taskOutputPath);
-            }
+          if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+            LOG.warn("Parent died.  Exiting "+taskId);
+            System.exit(66);
           }
-        } catch (IOException ioe) {
-          // To be safe in case of an exception
-          shouldBePromoted = true;
+          taskStatus.clearStatus();
+          return;
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt(); // interrupt ourself
         }
-        umbilical.done(taskId, shouldBePromoted);
-        LOG.info("Task '" + getTaskID() + "' done.");
+      } catch (IOException ie) {
+        LOG.warn("Failure sending last status update: " + 
+                  StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          throw ie;
+        }
+      }
+    }
+  }
+
+  private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
+    int retries = MAX_RETRIES;
+    while (true) {
+      try {
+        umbilical.done(getTaskID());
+        LOG.info("Task '" + taskId + "' done.");
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling completion: " + 
@@ -601,15 +641,66 @@ abstract class Task implements Writable, Configurable {
       }
     }
   }
+
+  private void commit(TaskUmbilicalProtocol umbilical,
+                      OutputCommitter committer) throws IOException {
+    int retries = MAX_RETRIES;
+    while (true) {
+      try {
+        while (!umbilical.canCommit(taskId)) {
+          try {
+            Thread.sleep(1000);
+          } catch(InterruptedException ie) {
+            //ignore
+          }
+          setProgressFlag();
+        }
+        // task can Commit now  
+        try {
+          LOG.info("Task " + taskId + " is allowed to commit now");
+          committer.commitTask(taskContext);
+          return;
+        } catch (IOException iee) {
+          LOG.warn("Failure committing: " + 
+                    StringUtils.stringifyException(iee));
+          discardOutput(taskContext, committer);
+          throw iee;
+        }
+      } catch (IOException ie) {
+        LOG.warn("Failure asking whether task can commit: " + 
+            StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          //if it couldn't commit a successfully then delete the output
+          discardOutput(taskContext, committer);
+          System.exit(68);
+        }
+      }
+    }
+  }
+
+  private void discardOutput(TaskAttemptContext taskContext,
+                             OutputCommitter committer) {
+    try {
+      committer.abortTask(taskContext);
+    } catch (IOException ioe)  {
+      LOG.warn("Failure cleaning up: " + 
+               StringUtils.stringifyException(ioe));
+    }
+  }
+
+  protected void runCleanup(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // set phase for this task
+    setPhase(TaskStatus.Phase.CLEANUP);
+    getProgress().setStatus("cleanup");
+    // do the cleanup
+    conf.getOutputCommitter().cleanupJob(jobContext);
+    done(umbilical);
+  }
   
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;
-
-      if (taskId != null && taskOutputPath == null && 
-          FileOutputFormat.getOutputPath(this.conf) != null) {
-        taskOutputPath = getTaskOutputPath(this.conf);
-      }
     } else {
       this.conf = new JobConf(conf);
     }
@@ -632,68 +723,6 @@ abstract class Task implements Writable, Configurable {
     return this.conf;
   }
 
-  /**
-   * Save the task's output on successful completion.
-   * 
-   * @throws IOException
-   */
-  void saveTaskOutput() throws IOException {
-
-    if (taskOutputPath != null) {
-      FileSystem fs = taskOutputPath.getFileSystem(conf);
-      if (fs.exists(taskOutputPath)) {
-        Path jobOutputPath = taskOutputPath.getParent().getParent();
-
-        // Move the task outputs to their final place
-        moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
-
-        // Delete the temporary task-specific output directory
-        if (!fs.delete(taskOutputPath, true)) {
-          LOG.info("Failed to delete the temporary output directory of task: " + 
-                  getTaskID() + " - " + taskOutputPath);
-        }
-        
-        LOG.info("Saved output of task '" + getTaskID() + "' to " + jobOutputPath);
-      }
-    }
-  }
-  
-  private Path getFinalPath(Path jobOutputDir, Path taskOutput) {
-    URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri());
-    if (relativePath.getPath().length() > 0) {
-      return new Path(jobOutputDir, relativePath.getPath());
-    } else {
-      return jobOutputDir;
-    }
-  }
-  
-  private void moveTaskOutputs(FileSystem fs, Path jobOutputDir, Path taskOutput) 
-  throws IOException {
-    if (fs.isFile(taskOutput)) {
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
-      if (!fs.rename(taskOutput, finalOutputPath)) {
-        if (!fs.delete(finalOutputPath, true)) {
-          throw new IOException("Failed to delete earlier output of task: " + 
-                  getTaskID());
-        }
-        if (!fs.rename(taskOutput, finalOutputPath)) {
-          throw new IOException("Failed to save output of task: " + 
-                  getTaskID());
-        }
-      }
-      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
-    } else if(fs.isDirectory(taskOutput)) {
-      FileStatus[] paths = fs.listStatus(taskOutput);
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
-      fs.mkdirs(finalOutputPath);
-      if (paths != null) {
-        for (FileStatus path : paths) {
-          moveTaskOutputs(fs, jobOutputDir, path.getPath());
-        }
-      }
-    }
-  }
-
   /**
    * OutputCollector for the combiner.
    */

+ 56 - 0
src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java

@@ -0,0 +1,56 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.Progressable;
+
+public class TaskAttemptContext extends JobContext {
+
+  private JobConf conf;
+  private TaskAttemptID taskid;
+  
+  TaskAttemptContext(JobConf conf, TaskAttemptID taskid) {
+    this(conf, taskid, Reporter.NULL);
+  }
+  
+  TaskAttemptContext(JobConf conf, TaskAttemptID taskid,
+                     Progressable progress) {
+    super(conf, progress);
+    this.conf = conf;
+    this.taskid = taskid;
+  }
+  
+  /**
+   * Get the taskAttemptID.
+   *  
+   * @return TaskAttemptID
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return taskid;
+  }
+  
+  /**
+   * Get the job Configuration.
+   * 
+   * @return JobConf
+   */
+  public JobConf getJobConf() {
+    return conf;
+  }
+
+}

+ 44 - 1
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -79,6 +79,7 @@ class TaskInProgress {
   private boolean killed = false;
   private volatile SortedRanges failedRanges = new SortedRanges();
   private volatile boolean skipping = false;
+  private boolean cleanup = false; 
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -107,6 +108,9 @@ class TaskInProgress {
   //list of tasks to kill, <taskid> -> <shouldFail> 
   private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
   
+  //task to commit, <taskattemptid>  
+  private TaskAttemptID taskToCommit;
+  
   private Counters counters = new Counters();
   
 
@@ -164,6 +168,14 @@ class TaskInProgress {
     return partition;
   }    
 
+  public boolean isCleanupTask() {
+   return cleanup;
+  }
+  
+  public void setCleanupTask() {
+    cleanup = true;
+  }
+  
   public boolean isOnlyCommitPending() {
     for (TaskStatus t : taskStatuses.values()) {
       if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
@@ -172,6 +184,14 @@ class TaskInProgress {
     }
     return false;
   }
+ 
+  public boolean isCommitPending(TaskAttemptID taskId) {
+    TaskStatus t = taskStatuses.get(taskId);
+    if (t == null) {
+      return false;
+    }
+    return t.getRunState() ==  TaskStatus.State.COMMIT_PENDING;
+  }
   
   /**
    * Initialization common to Map and Reduce
@@ -324,12 +344,31 @@ class TaskInProgress {
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
+    } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
+               !tasksReportedClosed.contains(taskid)) {
+      tasksReportedClosed.add(taskid);
+      close = true; 
     } else {
       close = tasksToKill.keySet().contains(taskid);
     }   
     return close;
   }
 
+  /**
+   * Commit this task attempt for the tip. 
+   * @param taskid
+   */
+  public void doCommit(TaskAttemptID taskid) {
+    taskToCommit = taskid;
+  }
+
+  /**
+   * Returns whether the task attempt should be committed or not 
+   */
+  public boolean shouldCommit(TaskAttemptID taskid) {
+    return taskToCommit.equals(taskid);
+  }
+
   /**
    * Creates a "status report" for this task.  Includes the
    * task ID and overall status, plus reports for all the
@@ -401,7 +440,8 @@ class TaskInProgress {
       // status update for the same taskid! This is a safety check, 
       // and is addressed better at the TaskTracker to ensure this.
       // @see {@link TaskTracker.transmitHeartbeat()}
-      if ((newState != TaskStatus.State.RUNNING) && 
+      if ((newState != TaskStatus.State.RUNNING && 
+           newState != TaskStatus.State.COMMIT_PENDING ) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
                  "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
@@ -733,6 +773,9 @@ class TaskInProgress {
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
+    if (cleanup) {
+      t.setCleanupTask();
+    }
     t.setConf(conf);
     t.setFailedRanges(failedRanges);
     t.setSkipping(skipping);

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -436,7 +436,7 @@ abstract class TaskRunner extends Thread {
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tracker.reportTaskFinished(t.getTaskID());
+      tracker.reportTaskFinished(t.getTaskID(), false);
     }
   }
 

+ 5 - 3
src/mapred/org/apache/hadoop/mapred/TaskStatus.java

@@ -37,7 +37,7 @@ abstract class TaskStatus implements Writable, Cloneable {
     LogFactory.getLog(TaskStatus.class.getName());
   
   //enumeration for reporting current phase of a task. 
-  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
+  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
@@ -264,9 +264,11 @@ abstract class TaskStatus implements Writable, Cloneable {
    * @param phase
    * @param counters
    */
-  synchronized void statusUpdate(float progress, String state, 
+  synchronized void statusUpdate(State runState, 
+                                 float progress,
+                                 String state, 
                                  Counters counters) {
-    setRunState(TaskStatus.State.RUNNING);
+    setRunState(runState);
     setProgress(progress);
     setStateString(state);
     setCounters(counters);

+ 59 - 29
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -191,6 +192,12 @@ public class TaskTracker
    */  
   private int probe_sample_size = 500;
     
+  /*
+   * A list of commitTaskActions for whom commit response has been received 
+   */
+  private List<TaskAttemptID> commitResponses = 
+            Collections.synchronizedList(new ArrayList<TaskAttemptID>());
+
   private ShuffleServerMetrics shuffleServerMetrics;
   /** This class contains the methods that should be used for metrics-reporting
    * the specific metrics for shuffle. The TaskTracker is actually a server for
@@ -407,8 +414,10 @@ public class TaskTracker
     // RPC initialization
     int max = maxCurrentMapTasks > maxCurrentReduceTasks ? 
                        maxCurrentMapTasks : maxCurrentReduceTasks;
+    //set the num handlers to max*2 since canCommit may wait for the duration
+    //of a heartbeat RPC
     this.taskReportServer =
-      RPC.getServer(this, bindAddress, tmpPort, max, false, this.fConf);
+      RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf);
     this.taskReportServer.start();
 
     // get the assigned address
@@ -957,6 +966,13 @@ public class TaskTracker
           for(TaskTrackerAction action: actions) {
             if (action instanceof LaunchTaskAction) {
               startNewTask((LaunchTaskAction) action);
+            } else if (action instanceof CommitTaskAction) {
+              CommitTaskAction commitAction = (CommitTaskAction)action;
+              if (!commitResponses.contains(commitAction.getTaskID())) {
+                LOG.info("Received commit task action for " + 
+                          commitAction.getTaskID());
+                commitResponses.add(commitAction.getTaskID());
+              }
             } else {
               tasksToCleanup.put(action);
             }
@@ -1072,7 +1088,8 @@ public class TaskTracker
       
     synchronized (this) {
       for (TaskStatus taskStatus : status.getTaskReports()) {
-        if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+        if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
           } else {
@@ -1188,7 +1205,8 @@ public class TaskTracker
   private synchronized void markUnresponsiveTasks() throws IOException {
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
-      if (tip.getRunState() == TaskStatus.State.RUNNING) {
+      if (tip.getRunState() == TaskStatus.State.RUNNING ||
+          tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
         long jobTaskTimeout = tip.getTaskTimeout();
@@ -1308,7 +1326,8 @@ public class TaskTracker
     TaskInProgress killMe = null;
     for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
       TaskInProgress tip = (TaskInProgress) it.next();
-      if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
+      if ((tip.getRunState() == TaskStatus.State.RUNNING ||
+           tip.getRunState() == TaskStatus.State.COMMIT_PENDING) &&
           !tip.wasKilled) {
                 
         if (killMe == null) {
@@ -1517,7 +1536,6 @@ public class TaskTracker
     private TaskStatus taskStatus; 
     private long taskTimeout;
     private String debugCommand;
-    private boolean shouldPromoteOutput = false;
         
     /**
      */
@@ -1667,7 +1685,8 @@ public class TaskTracker
           "% " + taskStatus.getStateString());
       
       if (this.done || 
-          this.taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+          (this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
         //make sure we ignore progress messages after a task has 
         //invoked TaskUmbilicalProtocol.done() or if the task has been
         //KILLED/FAILED
@@ -1717,15 +1736,8 @@ public class TaskTracker
     /**
      * The task is reporting that it's done running
      */
-    public synchronized void reportDone(boolean shouldPromote) {
-      TaskStatus.State state = null;
-      this.shouldPromoteOutput = shouldPromote;
-      if (shouldPromote) {
-        state = TaskStatus.State.COMMIT_PENDING;
-      } else {
-        state = TaskStatus.State.SUCCEEDED;
-      }
-      this.taskStatus.setRunState(state);
+    public synchronized void reportDone() {
+      this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
@@ -1758,13 +1770,7 @@ public class TaskTracker
       //
       boolean needCleanup = false;
       synchronized (this) {
-        if (done) {
-          if (shouldPromoteOutput) {
-            taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
-          } else {
-            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-          }
-        } else {
+        if (!done) {
           if (!wasKilled) {
             failures += 1;
             taskStatus.setRunState(TaskStatus.State.FAILED);
@@ -1960,7 +1966,8 @@ public class TaskTracker
     public void jobHasFinished(boolean wasFailure) throws IOException {
       // Kill the task if it is still running
       synchronized(this){
-        if (getRunState() == TaskStatus.State.RUNNING) {
+        if (getRunState() == TaskStatus.State.RUNNING ||
+            getRunState() == TaskStatus.State.COMMIT_PENDING) {
           kill(wasFailure);
         }
       }
@@ -1974,7 +1981,8 @@ public class TaskTracker
      * @param wasFailure was it a failure (versus a kill request)?
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
-      if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+      if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
+          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
@@ -2135,14 +2143,34 @@ public class TaskTracker
     return tasks.get(taskid) != null;
   }
 
+  /**
+   * Task is reporting that it is in commit_pending
+   * and it is waiting for the commit Response
+   */
+  public synchronized void commitPending(TaskAttemptID taskid,
+                                         TaskStatus taskStatus) 
+  throws IOException {
+    LOG.info("Task " + taskid + " is in COMMIT_PENDING");
+    statusUpdate(taskid, taskStatus);
+    reportTaskFinished(taskid, true);
+  }
+  
+  /**
+   * Child checking whether it can commit 
+   */
+  public synchronized boolean canCommit(TaskAttemptID taskid) {
+    return commitResponses.contains(taskid); //don't remove it now
+  }
+  
   /**
    * The task is done.
    */
-  public synchronized void done(TaskAttemptID taskid, boolean shouldPromote) 
+  public synchronized void done(TaskAttemptID taskid) 
   throws IOException {
     TaskInProgress tip = tasks.get(taskid);
+    commitResponses.remove(taskid);
     if (tip != null) {
-      tip.reportDone(shouldPromote);
+      tip.reportDone();
     } else {
       LOG.warn("Unknown child task done: "+taskid+". Ignored.");
     }
@@ -2196,13 +2224,15 @@ public class TaskTracker
   /**
    * The task is no longer running.  It may not have completed successfully
    */
-  void reportTaskFinished(TaskAttemptID taskid) {
+  void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) {
     TaskInProgress tip;
     synchronized (this) {
       tip = tasks.get(taskid);
     }
     if (tip != null) {
-      tip.taskFinished();
+      if (!commitPending) {
+        tip.taskFinished();
+      }
       synchronized(finishedCount) {
         finishedCount[0]++;
         finishedCount.notify();
@@ -2331,7 +2361,7 @@ public class TaskTracker
       TaskStatus status = tip.getStatus();
       status.setIncludeCounters(sendCounters);
       status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
-      // send counters for finished or failed tasks.
+      // send counters for finished or failed tasks and commit pending tasks
       if (status.getRunState() != TaskStatus.State.RUNNING) {
         status.setIncludeCounters(true);
       }

+ 9 - 1
src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java

@@ -48,7 +48,10 @@ abstract class TaskTrackerAction implements Writable {
     KILL_JOB,
     
     /** Reinitialize the tasktracker. */
-    REINIT_TRACKER
+    REINIT_TRACKER,
+
+    /** Ask a task to save its output. */
+    COMMIT_TASK
   };
   
   /**
@@ -80,6 +83,11 @@ abstract class TaskTrackerAction implements Writable {
         action = new ReinitTrackerAction();
       }
       break;
+    case COMMIT_TASK:
+      {
+        action = new CommitTaskAction();
+      }
+      break;
     }
 
     return action;

+ 4 - 2
src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -207,7 +207,8 @@ class TaskTrackerStatus implements Writable {
       TaskStatus.State state = ts.getRunState();
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           (state == TaskStatus.State.COMMIT_PENDING))) {
         mapCount++;
       }
     }
@@ -224,7 +225,8 @@ class TaskTrackerStatus implements Writable {
       TaskStatus.State state = ts.getRunState();
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           (state == TaskStatus.State.COMMIT_PENDING))) {
         reduceCount++;
       }
     }

+ 21 - 3
src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -45,9 +45,10 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
    * Version 9 changes the counter representation for HADOOP-1915
    * Version 10 changed the TaskStatus format and added reportNextRecordRange
    *            for HADOOP-153
+   * Version 11 Adds RPCs for task commit as part of HADOOP-3150
    * */
 
-  public static final long versionID = 10L;
+  public static final long versionID = 11L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(TaskAttemptID taskid) throws IOException;
@@ -88,9 +89,26 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
   /** Report that the task is successfully completed.  Failure is assumed if
    * the task process exits without calling this.
    * @param taskid task's id
-   * @param shouldBePromoted whether to promote the task's output or not 
    */
-  void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException;
+  void done(TaskAttemptID taskid) throws IOException;
+  
+  /** 
+   * Report that the task is complete, but its commit is pending.
+   * 
+   * @param taskId task's id
+   * @param taskStatus status of the child
+   * @throws IOException
+   */
+  void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) 
+  throws IOException, InterruptedException;  
+
+  /**
+   * Polling to know whether the task can go-ahead with commit 
+   * @param taskid
+   * @return true/false 
+   * @throws IOException
+   */
+  boolean canCommit(TaskAttemptID taskid) throws IOException;
 
   /** Report that a reduce-task couldn't shuffle map-outputs.*/
   void shuffleError(TaskAttemptID taskId, String message) throws IOException;

+ 93 - 0
src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+public class TestFileOutputCommitter extends TestCase {
+  private static Path outDir = new Path(
+     System.getProperty("test.build.data", "."), "output");
+
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
+  private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+
+  @SuppressWarnings("unchecked")
+  public void testCommitter() throws Exception {
+    JobConf job = new JobConf();
+    job.set("mapred.task.id", attempt);
+    job.setOutputCommitter(FileOutputCommitter.class);
+    JobContext jContext = new JobContext(job);
+    TaskAttemptContext tContext = new TaskAttemptContext(job, taskID);
+    FileOutputFormat.setOutputPath(job, outDir);
+    FileOutputCommitter committer = new FileOutputCommitter();
+    FileOutputFormat.setWorkOutputPath(job, 
+      committer.getTempTaskOutputPath(tContext));
+
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+    String file = "test.txt";
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    FileSystem localFs = FileSystem.getLocal(job);
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter =
+      theOutputFormat.getRecordWriter(localFs, job, file, reporter);
+    Text key1 = new Text("key1");
+    Text key2 = new Text("key2");
+    Text val1 = new Text("val1");
+    Text val2 = new Text("val2");
+    NullWritable nullWritable = NullWritable.get();
+
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(reporter);
+    }
+    committer.commitTask(tContext);
+    committer.cleanupJob(jContext);
+    
+    File expectedFile = new File(new Path(outDir, file).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+    String output = UtilsForTests.slurp(expectedFile);
+    assertEquals(output, expectedOutput.toString());
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestFileOutputCommitter().testCommitter();
+  }
+}

+ 2 - 0
src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

@@ -74,6 +74,8 @@ public class TestMiniMRLocalFS extends TestCase {
       assertEquals("number of maps", 1, reports.length);
       reports = client.getReduceTaskReports(jobid);
       assertEquals("number of reduces", 1, reports.length);
+      reports = client.getCleanupTaskReports(jobid);
+      assertEquals("number of cleanups", 2, reports.length);
       Counters counters = ret.job.getCounters();
       assertEquals("number of map inputs", 3, 
                    counters.getCounter(Task.Counter.MAP_INPUT_RECORDS));

+ 5 - 2
src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java

@@ -38,12 +38,14 @@ public class TestMultipleTextOutputFormat extends TestCase {
     }
   }
 
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
+
   private static Path workDir = 
     new Path(new Path(
                       new Path(System.getProperty("test.build.data", "."), 
                                "data"), 
-                      MRConstants.TEMP_DIR_NAME), 
-             "TestMultipleTextOutputFormat");
+                      FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
 
   private static void writeData(RecordWriter<Text, Text> rw) throws IOException {
     for (int i = 10; i < 40; i++) {
@@ -84,6 +86,7 @@ public class TestMultipleTextOutputFormat extends TestCase {
   
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
+    job.set("mapred.task.id", attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);

+ 7 - 0
src/test/org/apache/hadoop/mapred/TestQueueManager.java

@@ -230,6 +230,13 @@ public class TestQueueManager extends TestCase {
         }
       }
       rjob.killJob();
+      while(rjob.cleanupProgress() == 0.0f) {
+        try {
+          Thread.sleep(10);  
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
       if (shouldSucceed) {
         assertTrue(rjob.isComplete());
       } else {

+ 4 - 3
src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java

@@ -23,7 +23,6 @@ import java.util.Random;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 import junit.framework.TestCase;
@@ -34,6 +33,8 @@ public class TestSequenceFileAsBinaryOutputFormat extends TestCase {
       LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName());
 
   private static final int RECORDS = 10000;
+  // A random task attempt id for testing.
+  private static final String attempt = "attempt_200707121733_0001_m_000000_0";
 
   public void testBinary() throws IOException {
     JobConf job = new JobConf();
@@ -41,8 +42,7 @@ public class TestSequenceFileAsBinaryOutputFormat extends TestCase {
     
     Path dir = 
       new Path(new Path(new Path(System.getProperty("test.build.data",".")), 
-                        MRConstants.TEMP_DIR_NAME),
-               "mapred");
+                        FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
     Path file = new Path(dir, "testbinary.seq");
     Random r = new Random();
     long seed = r.nextLong();
@@ -53,6 +53,7 @@ public class TestSequenceFileAsBinaryOutputFormat extends TestCase {
       fail("Failed to create output directory");
     }
 
+    job.set("mapred.task.id", attempt);
     FileOutputFormat.setOutputPath(job, dir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, dir);
 

+ 5 - 2
src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java

@@ -35,17 +35,19 @@ public class TestTextOutputFormat extends TestCase {
       throw new RuntimeException("init failure", e);
     }
   }
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
 
   private static Path workDir = 
     new Path(new Path(
                       new Path(System.getProperty("test.build.data", "."), 
                                "data"), 
-                      MRConstants.TEMP_DIR_NAME), 
-             "TestTextOutputFormat");
+                      FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
 
   @SuppressWarnings("unchecked")
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
+    job.set("mapred.task.id", attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);
@@ -98,6 +100,7 @@ public class TestTextOutputFormat extends TestCase {
     JobConf job = new JobConf();
     String separator = "\u0001";
     job.set("mapred.textoutputformat.separator", separator);
+    job.set("mapred.task.id", attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);

+ 38 - 0
src/webapps/job/jobdetails.jsp

@@ -88,6 +88,41 @@
                   ) + 
               "</td></tr>\n");
   }
+
+  private void printCleanupTaskSummary(JspWriter out,
+                                String jobId,
+                                TaskInProgress[] tasks
+                               ) throws IOException {
+    int totalTasks = tasks.length;
+    int runningTasks = 0;
+    int finishedTasks = 0;
+    int killedTasks = 0;
+    String kind = "cleanup";
+    for(int i=0; i < totalTasks; ++i) {
+      TaskInProgress task = tasks[i];
+      if (task.isComplete()) {
+        finishedTasks += 1;
+      } else if (task.isRunning()) {
+        runningTasks += 1;
+      } else if (task.isFailed()) {
+        killedTasks += 1;
+      }
+    }
+    int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks; 
+    out.print(((runningTasks > 0)  
+               ? "<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind + 
+                 "&pagenum=1" + "&state=running\">" + " Running" + 
+                 "</a>" 
+               : ((pendingTasks > 0) ? " Pending" :
+                 ((finishedTasks > 0) 
+               ?"<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind + 
+                "&pagenum=1" + "&state=completed\">" + " Successful"
+                 + "</a>" 
+               : ((killedTasks > 0) 
+               ?"<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind +
+                "&pagenum=1" + "&state=killed\">" + " Failed" 
+                + "</a>" : "None")))));
+  }
   
   private void printConfirm(JspWriter out, String jobId) throws IOException{
     String url = "jobdetails.jsp?jobid=" + jobId;
@@ -194,6 +229,9 @@
             job.getFinishTime(), job.getStartTime()) + "<br>\n");
       }
     }
+    out.print("<b>Job Cleanup:</b>");
+    printCleanupTaskSummary(out, jobId, job.getCleanupTasks());
+    out.print("<br>\n");
     if (flakyTaskTrackers > 0) {
       out.print("<b>Black-listed TaskTrackers:</b> " + 
           "<a href=\"jobblacklistedtrackers.jsp?jobid=" + jobId + "\">" +

+ 4 - 1
src/webapps/job/jobtasks.jsp

@@ -39,9 +39,12 @@
      reports = (job != null) ? tracker.getMapTaskReports(jobidObj) : null;
      tasks = (job != null) ? job.getMapTasks() : null;
     }
-  else{
+  else if ("reduce".equals(type)) {
     reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
     tasks = (job != null) ? job.getReduceTasks() : null;
+  } else {
+    reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
+    tasks = (job != null) ? job.getCleanupTasks() : null;
   }
 %>