Browse Source

commit 3b1d40f45ba4e129936af01e9dc608d08ed8e3e2
Author: Hemanth Yamijala <yhemanth@apache.org>
Date: Thu Oct 22 20:31:26 2009 +0530

MAPREDUCE:947 from https://issues.apache.org/jira/secure/attachment/12422899/mr-947-y20.patch

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-947. Added commitJob and abortJob apis to OutputCommitter.
+ Enhanced FileOutputCommitter to create a _SUCCESS file for successful
+ jobs. (Amar Kamat & Jothi Padmanabhan via acmurthy)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077033 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
5a7948fac2
42 changed files with 793 additions and 108 deletions
  1. 13 4
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
  2. 8 5
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
  3. 8 5
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java
  4. 8 6
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
  5. 9 5
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
  6. 4 4
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
  7. 2 2
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
  8. 11 7
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java
  9. 5 1
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
  10. 10 8
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java
  11. 8 5
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
  12. 15 5
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
  13. 45 1
      src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
  14. 14 0
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  15. 6 0
      src/mapred/org/apache/hadoop/mapred/JobStatus.java
  16. 2 2
      src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
  17. 54 2
      src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
  18. 7 1
      src/mapred/org/apache/hadoop/mapred/OutputLogFilter.java
  19. 41 1
      src/mapred/org/apache/hadoop/mapred/Task.java
  20. 60 0
      src/mapred/org/apache/hadoop/mapred/Utils.java
  21. 45 0
      src/mapred/org/apache/hadoop/mapreduce/JobStatus.java
  22. 29 2
      src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java
  23. 50 2
      src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
  24. 1 0
      src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
  25. 21 1
      src/test/findbugsExcludeFile.xml
  26. 4 1
      src/test/mapred-site.xml
  27. 2 2
      src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
  28. 1 1
      src/test/org/apache/hadoop/mapred/TestBadRecords.java
  29. 2 2
      src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
  30. 4 3
      src/test/org/apache/hadoop/mapred/TestEmptyJob.java
  31. 1 1
      src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
  32. 2 2
      src/test/org/apache/hadoop/mapred/TestJavaSerialization.java
  33. 267 0
      src/test/org/apache/hadoop/mapred/TestJobCleanup.java
  34. 4 2
      src/test/org/apache/hadoop/mapred/TestJobName.java
  35. 3 2
      src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
  36. 2 1
      src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
  37. 4 4
      src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
  38. 1 1
      src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
  39. 4 2
      src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
  40. 7 3
      src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
  41. 7 7
      src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
  42. 2 5
      src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

+ 13 - 4
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java

@@ -18,15 +18,23 @@
 
 package org.apache.hadoop.streaming;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
 import junit.framework.TestCase;
-import java.io.*;
-import java.util.*;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Utils;
 /**
  * This test case tests the symlink creation
  * utility provided by distributed caching 
@@ -113,7 +121,8 @@ public class TestMultipleCachefiles extends TestCase
         String line2 = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                      new Path(OUTPUT_DIR),
-                                     new OutputLogFilter()));
+                                     new Utils.OutputFileUtils
+                                       .OutputFilesFilter()));
         for (int i = 0; i < fileList.length; i++){
           System.out.println(fileList[i].toString());
           BufferedReader bread =

+ 8 - 5
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java

@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -74,7 +75,7 @@ public class TestStreamAggregate extends TestCase
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -94,10 +95,12 @@ public class TestStreamAggregate extends TestCase
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

+ 8 - 5
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java

@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 
@@ -82,7 +83,7 @@ public class TestStreamDataProtocol extends TestCase
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -103,10 +104,12 @@ public class TestStreamDataProtocol extends TestCase
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

+ 8 - 6
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java

@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -75,7 +76,7 @@ public class TestStreamReduceNone extends TestCase
     File outFile = null;
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -94,11 +95,12 @@ public class TestStreamReduceNone extends TestCase
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      outFile.delete();
-      File outFileCRC = new File(OUTPUT_DIR, "."+outFileName+".crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

+ 9 - 5
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java

@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests StreamXmlRecordReader
  * The test creates an XML file, uses StreamXmlRecordReader and compares
@@ -61,7 +63,7 @@ public class TestStreamXmlRecordReader extends TestStreaming
   public void testCommandLine() {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
       createInput();
@@ -74,10 +76,12 @@ public class TestStreamXmlRecordReader extends TestStreaming
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
-      INPUT_FILE.delete();
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        e.printStackTrace();
+      }
     }
   }
 

+ 4 - 4
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.streaming;
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
  */
@@ -73,7 +75,7 @@ public class TestStreaming extends TestCase
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -91,10 +93,8 @@ public class TestStreaming extends TestCase
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
     }
   }
 

+ 2 - 2
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java

@@ -37,9 +37,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SkipBadRecords;
+import org.apache.hadoop.mapred.Utils;
 
 public class TestStreamingBadRecords extends ClusterMapReduceTestCase
 {
@@ -118,7 +118,7 @@ public class TestStreamingBadRecords extends ClusterMapReduceTestCase
     badRecs.addAll(REDUCER_BAD_RECORDS);
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(getOutputDir(),
-        new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
     
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);

+ 11 - 7
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.streaming;
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode by giving
  * empty input to mapper and the mapper generates nonempty output. Since map()
@@ -77,7 +79,7 @@ public class TestStreamingEmptyInpNonemptyOut extends TestCase
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -100,12 +102,14 @@ public class TestStreamingEmptyInpNonemptyOut extends TestCase
       outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
       outFile.delete();
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      SCRIPT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
-    }
+      try {
+        INPUT_FILE.delete();
+        SCRIPT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        e.printStackTrace();
+      }
+   }
   }
 
   public static void main(String[]args) throws Exception

+ 5 - 1
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java

@@ -75,7 +75,11 @@ public class TestStreamingFailure extends TestStreaming
     } catch(Exception e) {
       // Expecting an exception
     } finally {
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
   }
 

+ 10 - 8
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.streaming;
 import junit.framework.TestCase;
 import java.io.*;
 
+import org.apache.hadoop.fs.FileUtil;
+
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
  * This testcase looks at different cases of tab position in input. 
@@ -78,7 +80,7 @@ public class TestStreamingKeyValue extends TestCase
     File outFile = null;
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -96,13 +98,13 @@ public class TestStreamingKeyValue extends TestCase
       assertEquals(outputExpect, output);
     } catch(Exception e) {
       failTrace(e);
-    } finally {
-      outFile.delete();
-      File outFileCRC = new File(OUTPUT_DIR,
-                          "." + outFileName + ".crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+    } finally { 
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

+ 8 - 5
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java

@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -89,7 +90,7 @@ public class TestStreamingSeparator extends TestCase
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -109,10 +110,12 @@ public class TestStreamingSeparator extends TestCase
     } catch(Exception e) {
       failTrace(e);
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch(Exception e) {
+        failTrace(e);
+      }
     }
   }
 

+ 15 - 5
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java

@@ -18,15 +18,23 @@
 
 package org.apache.hadoop.streaming;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
 import junit.framework.TestCase;
-import java.io.*;
-import java.util.*;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Utils;
 /**
  * This test case tests the symlink creation
  * utility provided by distributed caching 
@@ -88,7 +96,8 @@ public class TestSymLink extends TestCase
           "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
         };
 
-        fileSys.delete(new Path(OUTPUT_DIR));
+        fileSys.delete(new Path(OUTPUT_DIR), true);
+      
         
         DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
         file.writeBytes(mapString);
@@ -104,7 +113,8 @@ public class TestSymLink extends TestCase
         String line = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                                 new Path(OUTPUT_DIR),
-                                                new OutputLogFilter()));
+                                                new Utils.OutputFileUtils
+                                                .OutputFilesFilter()));
         for (int i = 0; i < fileList.length; i++){
           System.out.println(fileList[i].toString());
           BufferedReader bread =

+ 45 - 1
src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java

@@ -39,6 +39,9 @@ public class FileOutputCommitter extends OutputCommitter {
    * Temporary directory name 
    */
   public static final String TEMP_DIR_NAME = "_temporary";
+  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
 
   public void setupJob(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
@@ -52,7 +55,35 @@ public class FileOutputCommitter extends OutputCommitter {
     }
   }
 
-  public void cleanupJob(JobContext context) throws IOException {
+  private static boolean getOutputDirMarking(JobConf conf) {
+    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                           true);
+  }
+
+  // Mark the output dir of the job for which the context is passed.
+  private void markSuccessfulOutputDir(JobContext context) 
+  throws IOException {
+    JobConf conf = context.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      FileSystem fileSys = outputPath.getFileSystem(conf);
+      // create a file in the folder to mark it
+      if (fileSys.exists(outputPath)) {
+        Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+        fileSys.create(filePath).close();
+      }
+    }
+  }
+  
+  @Override
+  public void commitJob(JobContext context) throws IOException {
+    cleanup(context);
+    if (getOutputDirMarking(context.getJobConf())) {
+      markSuccessfulOutputDir(context);
+    }
+  }
+  
+  private void cleanup(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
     // do the clean up of temporary directory
     Path outputPath = FileOutputFormat.getOutputPath(conf);
@@ -63,9 +94,22 @@ public class FileOutputCommitter extends OutputCommitter {
       if (fileSys.exists(tmpDir)) {
         fileSys.delete(tmpDir, true);
       }
+    } else {
+      LOG.warn("Output path is null in cleanup");
     }
   }
 
+  /**
+   * Delete the temporary directory, including all of the work directories.
+   * @param context the job's context
+   * @param runState final run state of the job, should be
+   * {@link JobStatus#KILLED} or {@link JobStatus#FAILED}
+   */
+  @Override
+  public void abortJob(JobContext context, int runState) throws IOException {
+    cleanup(context);
+  }
+  
   public void setupTask(TaskAttemptContext context) throws IOException {
     // FileOutputCommitter's setupTask doesn't do anything. Because the
     // temporary task directory is created on demand when the 

+ 14 - 0
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -1215,6 +1215,20 @@ class JobInProgress {
       if (result != null) {
         addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
       }
+
+      if (result != null) {
+        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+        if (jobFailed) {
+          result.setJobCleanupTaskState
+          (org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+        } else if (jobKilled) {
+          result.setJobCleanupTaskState
+          (org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+        } else {
+          result.setJobCleanupTaskState
+          (org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+        }
+      }
       return result;
     }
     

+ 6 - 0
src/mapred/org/apache/hadoop/mapred/JobStatus.java

@@ -322,4 +322,10 @@ public class JobStatus implements Writable, Cloneable {
     this.priority = WritableUtils.readEnum(in, JobPriority.class);
     this.schedulingInfo = Text.readString(in);
   }
+
+  // A utility to convert new job runstates to the old ones.
+  static int getOldNewJobRunState(
+    org.apache.hadoop.mapreduce.JobStatus.State state) {
+    return state.getValue();
+  }
 }

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -231,7 +231,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
           }
         }
         // delete the temporary directory in output directory
-        outputCommitter.cleanupJob(jContext);
+        outputCommitter.commitJob(jContext);
         status.setCleanupProgress(1.0f);
 
         if (killed) {
@@ -244,7 +244,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
       } catch (Throwable t) {
         try {
-          outputCommitter.cleanupJob(jContext);
+          outputCommitter.abortJob(jContext, JobStatus.FAILED);
         } catch (IOException ioe) {
           LOG.info("Error cleaning up job:" + id);
         }

+ 54 - 2
src/mapred/org/apache/hadoop/mapred/OutputCommitter.java

@@ -68,12 +68,36 @@ public abstract class OutputCommitter
 
   /**
    * For cleaning up the job's output after job completion
+   * @deprecated use {@link #commitJob(JobContext)} or
+   *                 {@link #abortJob(JobContext, int)} instead
+   */
+  @Deprecated
+  public void cleanupJob(JobContext jobContext) throws IOException { }
+
+  /**
+   * For committing job's output after successful job completion. Note that this
+   * is invoked for jobs with final run state as {@link JobStatus#SUCCEEDED}.
    * 
    * @param jobContext Context of the job whose output is being written.
-   * @throws IOException
+   * @throws IOException 
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  public void commitJob(JobContext jobContext) throws IOException { 
+    cleanupJob(jobContext);
+  }
 
+  /**
+   * For cleaning up the job's output after job failure.
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @param status Final run state of the job, should be 
+   * {@link JobStatus#KILLED} or {@link JobStatus#FAILED}
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, int status) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
   /**
    * Sets up output for the task.
    * 
@@ -130,11 +154,39 @@ public abstract class OutputCommitter
    * is a bridge between the two.
    */
   @Override
+  @Deprecated
   public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
                                ) throws IOException {
     cleanupJob((JobContext) context);
   }
 
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
+                               ) throws IOException {
+    commitJob((JobContext) context);
+  }
+
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
+                          org.apache.hadoop.mapreduce.JobStatus.State runState) 
+  throws IOException {
+    int state = JobStatus.getOldNewJobRunState(runState);
+    if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
+      throw new IOException ("Invalid job run state : " + runState.name());
+    }
+    abortJob((JobContext) context, state);
+  }
+  
   /**
    * This method implements the new interface by calling the old method. Note
    * that the input types are different between the new and old apis and this

+ 7 - 1
src/mapred/org/apache/hadoop/mapred/OutputLogFilter.java

@@ -27,9 +27,15 @@ import org.apache.hadoop.fs.PathFilter;
  * This can be used to list paths of output directory as follows:
  *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
  *                                   new OutputLogFilter()));
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputLogFilter} 
+ *   instead.
  */
 public class OutputLogFilter implements PathFilter {
+  private static final PathFilter LOG_FILTER = 
+    new Utils.OutputFileUtils.OutputLogFilter();
+     
   public boolean accept(Path path) {
-    return !(path.toString().contains("_logs"));
+    return LOG_FILTER.accept(path);
   }
 }

+ 41 - 1
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -42,9 +42,11 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
@@ -117,6 +119,7 @@ abstract public class Task implements Writable, Configurable {
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
+  protected JobStatus.State jobRunStateForCleanup;
   protected boolean jobCleanup = false;
   protected boolean jobSetup = false;
   protected boolean taskCleanup = false;
@@ -309,6 +312,14 @@ abstract public class Task implements Writable, Configurable {
     return jobCleanup;
   }
 
+  boolean isJobAbortTask() {
+    // the task is an abort task if its marked for cleanup and the final 
+    // expected state is either failed or killed.
+    return isJobCleanupTask() 
+           && (jobRunStateForCleanup == JobStatus.State.KILLED 
+               || jobRunStateForCleanup == JobStatus.State.FAILED);
+  }
+  
   boolean isJobSetupTask() {
     return jobSetup;
   }
@@ -321,6 +332,14 @@ abstract public class Task implements Writable, Configurable {
     jobCleanup = true; 
   }
 
+  /**
+   * Sets the task to do job abort in the cleanup.
+   * @param status the final runstate of the job 
+   */
+  void setJobCleanupTaskState(JobStatus.State status) {
+    jobRunStateForCleanup = status;
+  }
+  
   boolean isMapOrReduce() {
     return !jobSetup && !jobCleanup && !taskCleanup;
   }
@@ -341,6 +360,9 @@ abstract public class Task implements Writable, Configurable {
     skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(jobCleanup);
+    if (jobCleanup) {
+      WritableUtils.writeEnum(out, jobRunStateForCleanup);
+    }
     out.writeBoolean(jobSetup);
     Text.writeString(out, username);
     out.writeBoolean(writeSkipRecs);
@@ -359,6 +381,10 @@ abstract public class Task implements Writable, Configurable {
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     jobCleanup = in.readBoolean();
+    if (jobCleanup) {
+      jobRunStateForCleanup = 
+        WritableUtils.readEnum(in, JobStatus.State.class);
+    }
     jobSetup = in.readBoolean();
     username = Text.readString(in);
     writeSkipRecs = in.readBoolean();
@@ -832,7 +858,21 @@ abstract public class Task implements Writable, Configurable {
     getProgress().setStatus("cleanup");
     statusUpdate(umbilical);
     // do the cleanup
-    committer.cleanupJob(jobContext);
+    LOG.info("Cleaning up job");
+    if (jobRunStateForCleanup == JobStatus.State.FAILED 
+        || jobRunStateForCleanup == JobStatus.State.KILLED) {
+      LOG.info("Aborting job with runstate : " + jobRunStateForCleanup);
+          committer.abortJob(jobContext, jobRunStateForCleanup);
+    } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
+      LOG.info("Committing job");
+      committer.commitJob(jobContext);
+    } else {
+      throw new IOException("Invalid state of the job for cleanup. State found "
+                            + jobRunStateForCleanup + " expecting "
+                            + JobStatus.State.SUCCEEDED + ", " 
+                            + JobStatus.State.FAILED + " or "
+                            + JobStatus.State.KILLED);
+    }
     done(umbilical, reporter);
   }
 

+ 60 - 0
src/mapred/org/apache/hadoop/mapred/Utils.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * A utility class. It provides
+ *   - file-util
+ *     - A path filter utility to filter out output/part files in the output dir
+ */
+public class Utils {
+  public static class OutputFileUtils {
+    /**
+     * This class filters output(part) files from the given directory
+     * It does not accept files with filenames _logs and _SUCCESS.
+     * This can be used to list paths of output directory as follows:
+     *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+     *                                         new OutputFilesFilter()));
+     */
+    public static class OutputFilesFilter extends OutputLogFilter {
+      public boolean accept(Path path) {
+        return super.accept(path) 
+               && !FileOutputCommitter.SUCCEEDED_FILE_NAME
+                   .equals(path.getName());
+      }
+    }
+    
+    /**
+     * This class filters log files from directory given
+     * It doesnt accept paths having _logs.
+     * This can be used to list paths of output directory as follows:
+     *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+     *                                   new OutputLogFilter()));
+     */
+    public static class OutputLogFilter implements PathFilter {
+      public boolean accept(Path path) {
+        return !(path.toString().contains("_logs"));
+      }
+    }
+  }
+}
+

+ 45 - 0
src/mapred/org/apache/hadoop/mapreduce/JobStatus.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Describes the current status of a job.
+ */
+public class JobStatus {
+  /**
+   * Current state of the job 
+   */
+  public static enum State {
+    RUNNING(1),
+    SUCCEEDED(2),
+    FAILED(3),
+    PREP(4),
+    KILLED(5);
+
+    int value;
+
+    State(int value) {
+      this.value = value;
+    }
+
+    public int getValue() {
+      return value;
+    }
+  }
+}

+ 29 - 2
src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java

@@ -65,13 +65,40 @@ public abstract class OutputCommitter {
   public abstract void setupJob(JobContext jobContext) throws IOException;
 
   /**
-   * For cleaning up the job's output after job completion
+   * For cleaning up the job's output after job completion. Note that this
+   * is invoked for jobs with final run state as 
+   * {@link JobStatus.State#SUCCEEDED}
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  public void commitJob(JobContext jobContext) throws IOException {
+    cleanupJob(jobContext);
+  }
+
+  /**
+   * For cleaning up the job's output after job completion
+   * @deprecated use {@link #commitJob(JobContext)} or
+   *                 {@link #abortJob(JobContext, JobStatus.State)} instead
+   */
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException { }
 
+  /**
+   * For aborting an unsuccessful job's output. Note that this is invoked for 
+   * jobs with final run state as {@link JobStatus.State#FAILED} or 
+   * {@link JobStatus.State#KILLED}.
+ 
+   * @param jobContext Context of the job whose output is being written.
+   * @param state final run state of the job, should be either 
+   * {@link JobStatus.State#KILLED} or {@link JobStatus.State#FAILED} 
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, JobStatus.State state) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
   /**
    * Sets up output for the task.
    * 

+ 50 - 2
src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

@@ -23,10 +23,12 @@ import java.net.URI;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -43,6 +45,9 @@ public class FileOutputCommitter extends OutputCommitter {
    * Temporary directory name 
    */
   protected static final String TEMP_DIR_NAME = "_temporary";
+  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
   private FileSystem outputFileSystem = null;
   private Path outputPath = null;
   private Path workPath = null;
@@ -80,20 +85,63 @@ public class FileOutputCommitter extends OutputCommitter {
     }
   }
 
+  private static boolean shouldMarkOutputDir(Configuration conf) {
+    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                           true);
+  }
+
+  // Mark the output dir of the job for which the context is passed.
+  private void markOutputDirSuccessful(JobContext context)
+  throws IOException {
+    if (outputPath != null) {
+      FileSystem fileSys = outputPath.getFileSystem(context.getConfiguration());
+      if (fileSys.exists(outputPath)) {
+        // create a file in the folder to mark it
+        Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+        fileSys.create(filePath).close();
+      }
+    }
+  }
+
   /**
    * Delete the temporary directory, including all of the work directories.
-   * @param context the job's context
+   * This is called for all jobs whose final run state is SUCCEEDED
+   * @param context the job's context.
    */
-  public void cleanupJob(JobContext context) throws IOException {
+  public void commitJob(JobContext context) throws IOException {
+    // delete the _temporary folder
+    cleanup(context);
+    // check if the o/p dir should be marked
+    if (shouldMarkOutputDir(context.getConfiguration())) {
+      // create a _success file in the o/p folder
+      markOutputDirSuccessful(context);
+    }
+  }
+
+  private void cleanup(JobContext context) 
+  throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
       if (fileSys.exists(tmpDir)) {
         fileSys.delete(tmpDir, true);
       }
+    } else {
+      LOG.warn("Output path is null in cleanup");
     }
   }
 
+  /**
+   * Delete the temporary directory, including all of the work directories.
+   * @param context the job's context
+   * @param state final run state of the job, should be FAILED or KILLED
+   */
+  @Override
+  public void abortJob(JobContext context, JobStatus.State state)
+  throws IOException {
+    cleanup(context);
+  }
+  
   /**
    * No task setup required.
    */

+ 1 - 0
src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java

@@ -46,6 +46,7 @@ public class NullOutputFormat<K, V> extends OutputFormat<K, V> {
     return new OutputCommitter() {
       public void abortTask(TaskAttemptContext taskContext) { }
       public void cleanupJob(JobContext jobContext) { }
+      public void commitJob(JobContext jobContext) { }
       public void commitTask(TaskAttemptContext taskContext) { }
       public boolean needsTaskCommit(TaskAttemptContext taskContext) {
         return false;

+ 21 - 1
src/test/findbugsExcludeFile.xml

@@ -28,6 +28,16 @@
        <Field name="out" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
      </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.OutputCommitter" />
+       <Or>
+       <Method name="abortJob" />
+       <Method name="commitJob" />
+       <Method name="cleanupJob" />
+       </Or>
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
+
      <!--
        TFile
      -->
@@ -55,5 +65,15 @@
        <Method name="reportFatalError" />
        <Bug pattern="DM_EXIT" />
      </Match>
-    
+    <!--
+       We need to cast objects between old and new api objects
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.mapred.OutputCommitter" />
+       <Bug pattern="BC_UNCONFIRMED_CAST" />
+     </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
 </FindBugsFilter>

+ 4 - 1
src/test/mapred-site.xml

@@ -14,5 +14,8 @@
   <value>hosts.exclude</value>
   <description></description>
 </property>
-
+<property>
+  <name>mapreduce.fileoutputcommitter.marksuccessfuljobs</name>
+  <value>false</value>
+</property>
 </configuration>

+ 2 - 2
src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java

@@ -82,8 +82,8 @@ public class TestNoDefaultsJobConf extends HadoopTestCase {
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-                           getFileSystem().listStatus(outDir,
-                           new OutputLogFilter()));
+                   getFileSystem().listStatus(outDir,
+                   new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestBadRecords.java

@@ -165,7 +165,7 @@ public class TestBadRecords extends ClusterMapReduceTestCase {
     
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(getOutputDir(),
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     
     List<String> mapperOutput=getProcessed(input, mapperBadRecords);
     LOG.debug("mapperOutput " + mapperOutput.size());

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java

@@ -63,8 +63,8 @@ public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase {
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-                           getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                    getFileSystem().listStatus(getOutputDir(),
+                    new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

+ 4 - 3
src/test/org/apache/hadoop/mapred/TestEmptyJob.java

@@ -52,7 +52,7 @@ public class TestEmptyJob extends TestCase {
    */
   static class CommitterWithDelayCleanup extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       Configuration conf = context.getConfiguration();
       Path share = new Path(conf.get("share"));
       FileSystem fs = FileSystem.get(conf);
@@ -64,7 +64,7 @@ public class TestEmptyJob extends TestCase {
         }
         UtilsForTests.waitFor(100);
       }
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
 
@@ -195,7 +195,8 @@ public class TestEmptyJob extends TestCase {
         + " and not 1.0", runningJob.cleanupProgress() == 1.0);
 
     assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
-    FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+    FileStatus[] list = fs.listStatus(outDir, 
+        new Utils.OutputFileUtils.OutputFilesFilter());
     assertTrue("Number of part-files is " + list.length + " and not "
         + numReduces, list.length == numReduces);
 

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java

@@ -73,7 +73,7 @@ public class TestFileOutputCommitter extends TestCase {
       theRecordWriter.close(reporter);
     }
     committer.commitTask(tContext);
-    committer.cleanupJob(jContext);
+    committer.commitJob(jContext);
     
     File expectedFile = new File(new Path(outDir, file).toString());
     StringBuffer expectedOutput = new StringBuffer();

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestJavaSerialization.java

@@ -96,7 +96,7 @@ public class TestJavaSerialization extends ClusterMapReduceTestCase {
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -144,7 +144,7 @@ public class TestJavaSerialization extends ClusterMapReduceTestCase {
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
 }
 

+ 267 - 0
src/test/org/apache/hadoop/mapred/TestJobCleanup.java

@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * A JUnit test to test Map-Reduce job cleanup.
+ */
+public class TestJobCleanup extends TestCase {
+  private static String TEST_ROOT_DIR =
+      new File(System.getProperty("test.build.data", "/tmp") + "/" 
+               + "test-job-cleanup").toString();
+  private static final String ABORT_KILLED_FILE_NAME = 
+    "_custom_abort_killed";
+  private static final String ABORT_FAILED_FILE_NAME = 
+    "_custom_abort_failed";
+  private static FileSystem fileSys = null;
+  private static MiniMRCluster mr = null;
+  private static Path inDir = null;
+  private static Path emptyInDir = null;
+  private static int outDirs = 0;
+  
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestJobCleanup.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        fileSys = FileSystem.get(conf);
+        fileSys.delete(new Path(TEST_ROOT_DIR), true);
+        conf.set("mapred.job.tracker.handler.count", "1");
+        conf.set("mapred.job.tracker", "127.0.0.1:0");
+        conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+        conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+
+        mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+        inDir = new Path(TEST_ROOT_DIR, "test-input");
+        String input = "The quick brown fox\n" + "has many silly\n"
+                       + "red fox sox\n";
+        DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
+        file.writeBytes(input);
+        file.close();
+        emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
+        fileSys.mkdirs(emptyInDir);
+      }
+      
+      protected void tearDown() throws Exception {
+        if (fileSys != null) {
+          fileSys.close();
+        }
+        if (mr != null) {
+          mr.shutdown();
+        }
+      }
+    };
+    return setup;
+  }
+  
+  /** 
+   * Committer with abort making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomAbort extends FileOutputCommitter {
+    @Override
+    public void abortJob(JobContext context, int state) 
+    throws IOException {
+      JobConf conf = context.getJobConf();
+      Path outputPath = FileOutputFormat.getOutputPath(conf);
+      FileSystem fs = outputPath.getFileSystem(conf);
+      String fileName = (state == JobStatus.FAILED) 
+                        ? TestJobCleanup.ABORT_FAILED_FILE_NAME 
+                        : TestJobCleanup.ABORT_KILLED_FILE_NAME;
+      fs.create(new Path(outputPath, fileName)).close();
+    }
+  }
+  
+  private Path getNewOutputDir() {
+    return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
+  }
+  
+  private void configureJob(JobConf jc, String jobName, int maps, int reds, 
+                            Path outDir) {
+    jc.setJobName(jobName);
+    jc.setInputFormat(TextInputFormat.class);
+    jc.setOutputKeyClass(LongWritable.class);
+    jc.setOutputValueClass(Text.class);
+    FileInputFormat.setInputPaths(jc, inDir);
+    FileOutputFormat.setOutputPath(jc, outDir);
+    jc.setMapperClass(IdentityMapper.class);
+    jc.setReducerClass(IdentityReducer.class);
+    jc.setNumMapTasks(maps);
+    jc.setNumReduceTasks(reds);
+    jc.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true);
+  }
+  
+  // run a job with 1 map and let it run to completion
+  private void testSuccessfulJob(String filename, 
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "job with cleanup()", 1, 0, outDir);
+    jc.setOutputCommitter(committer);
+    
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    job.waitForCompletion();
+    
+    Path testFile = new Path(outDir, filename);
+    assertTrue("Done file missing for job " + id, fileSys.exists(testFile));
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for successful job " 
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  // run a job for which all the attempts simply fail.
+  private void testFailedJob(String fileName, 
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "fail job with abort()", 1, 0, outDir);
+    jc.setMaxMapAttempts(1);
+    // set the job to fail
+    jc.setMapperClass(UtilsForTests.FailMapper.class);
+    jc.setOutputCommitter(committer);
+
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    job.waitForCompletion();
+    
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for failed job " + id, 
+                 fileSys.exists(testFile));
+    }
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for failed job "
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  // run a job which gets stuck in mapper and kill it.
+  private void testKilledJob(String fileName,
+    Class<? extends OutputCommitter> committer, String[] exclude) 
+  throws IOException {
+    JobConf jc = mr.createJobConf();
+    Path outDir = getNewOutputDir();
+    configureJob(jc, "kill job with abort()", 1, 0, outDir);
+    // set the job to wait for long
+    jc.setMapperClass(UtilsForTests.KillMapper.class);
+    jc.setOutputCommitter(committer);
+    
+    JobClient jobClient = new JobClient(jc);
+    RunningJob job = jobClient.submitJob(jc);
+    JobID id = job.getID();
+    JobInProgress jip = 
+      mr.getJobTrackerRunner().getJobTracker().getJob(job.getID());
+    
+    // wait for the map to be launched
+    while (true) {
+      if (jip.runningMaps() == 1) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+    }
+    
+    job.killJob(); // kill the job
+    
+    job.waitForCompletion(); // wait for the job to complete
+    
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for job " + id, 
+                 fileSys.exists(testFile));
+    }
+    
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for killed job "
+                  + id, fileSys.exists(file));
+    }
+  }
+  
+  /**
+   * Test default cleanup/abort behavior
+   * 
+   * @throws IOException
+   */
+  public void testDefaultCleanupAndAbort() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      FileOutputCommitter.class,
+                      new String[] {});
+    
+    // check with a failed job
+    testFailedJob(null, 
+                  FileOutputCommitter.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+    
+    // check default abort job kill
+    testKilledJob(null, 
+                  FileOutputCommitter.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
+  
+  /**
+   * Test if a failed job with custom committer runs the abort code.
+   * 
+   * @throws IOException
+   */
+  public void testCustomAbort() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                      CommitterWithCustomAbort.class,
+                      new String[] {ABORT_FAILED_FILE_NAME, 
+                                    ABORT_KILLED_FILE_NAME});
+    
+    // check with a failed job
+    testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                                ABORT_KILLED_FILE_NAME});
+    
+    // check with a killed job
+    testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                                ABORT_FAILED_FILE_NAME});
+  }
+}

+ 4 - 2
src/test/org/apache/hadoop/mapred/TestJobName.java

@@ -61,7 +61,8 @@ public class TestJobName extends ClusterMapReduceTestCase {
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
+
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -95,7 +96,8 @@ public class TestJobName extends ClusterMapReduceTestCase {
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
+
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));

+ 3 - 2
src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java

@@ -77,7 +77,7 @@ public class TestMiniMRClasspath extends TestCase {
     {
       Path[] parents = FileUtil.stat2Paths(fs.listStatus(outDir.getParent()));
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-              new OutputLogFilter()));
+          new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         BufferedReader file = 
           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
@@ -132,7 +132,8 @@ public class TestMiniMRClasspath extends TestCase {
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                 new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
+
     for (int i = 0; i < fileList.length; ++i) {
       BufferedReader file = new BufferedReader(new InputStreamReader(
                                                                      fs.open(fileList[i])));

+ 2 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -101,7 +101,8 @@ public class TestMiniMRWithDFS extends TestCase {
     {
       
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                   new OutputLogFilter()));
+          new Utils.OutputFileUtils.OutputFilesFilter()));
+
       for(int i=0; i < fileList.length; ++i) {
         LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
         BufferedReader file = 

+ 4 - 4
src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java

@@ -49,10 +49,10 @@ public class TestSetupAndCleanupFailure extends TestCase {
     }
   }
 
-  // Commiter with cleanupJob throwing exception
+  // Commiter with commitJob throwing exception
   static class CommitterWithFailCleanup extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       throw new IOException();
     }
   }
@@ -78,9 +78,9 @@ public class TestSetupAndCleanupFailure extends TestCase {
     }
     
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
   

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java

@@ -80,7 +80,7 @@ public class TestUserDefinedCounters extends ClusterMapReduceTestCase {
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

+ 4 - 2
src/test/org/apache/hadoop/mapred/join/TestDatamerge.java

@@ -22,10 +22,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
+import junit.extensions.TestSetup;
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
-import junit.extensions.TestSetup;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -317,7 +318,8 @@ public class TestDatamerge extends TestCase {
     job.setOutputFormat(SequenceFileOutputFormat.class);
     JobClient.runJob(job);
 
-    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, 
+        new Utils.OutputFileUtils.OutputFilesFilter());
     assertEquals(1, outlist.length);
     assertTrue(0 < outlist[0].getLen());
     SequenceFile.Reader r =

+ 7 - 3
src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java

@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.mapred.lib;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -30,10 +34,10 @@ import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.HadoopTestCase;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.Utils;
 
 
 public class TestKeyFieldBasedComparator extends HadoopTestCase {
@@ -94,7 +98,7 @@ public class TestKeyFieldBasedComparator extends HadoopTestCase {
     }
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(outDir,
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

+ 7 - 7
src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

@@ -20,30 +20,30 @@ package org.apache.hadoop.mapred.pipes;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
-import junit.framework.TestCase;
-
 public class TestPipes extends TestCase {
   private static final Log LOG =
     LogFactory.getLog(TestPipes.class.getName());
@@ -180,7 +180,7 @@ public class TestPipes extends TestCase {
 
     List<String> results = new ArrayList<String>();
     for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
-    		                        new OutputLogFilter()))) {
+        new Utils.OutputFileUtils.OutputFilesFilter()))) {
       results.add(TestMiniMRWithDFS.readOutput(p, job));
     }
     assertEquals("number of reduces is wrong", 

+ 2 - 5
src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

@@ -37,10 +37,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.OutputLogFilter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
@@ -295,7 +292,7 @@ public class MapReduceTestUtil {
     StringBuffer result = new StringBuffer();
 
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-           new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     for (Path outputFile : fileList) {
       LOG.info("Path" + ": "+ outputFile);
       BufferedReader file =