瀏覽代碼

MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API. Contributed by Junping Du

Jian He 9 年之前
父節點
當前提交
6502d59e73

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -118,6 +118,9 @@ Trunk (Unreleased)
     MAPREDUCE-6407. Migrate MAPREDUCE nativetask build to new CMake framework
     (Alan Burlison via Colin P. McCabe)
 
+    MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API
+    (Junping Du via jianhe)
+
   BUG FIXES
 
     MAPREDUCE-6191. Improve clearing stale state of Java serialization

+ 48 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -304,10 +304,12 @@ public class MRAppMaster extends CompositeService {
     }
     
     boolean copyHistory = false;
+    committer = createOutputCommitter(conf);
     try {
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
       Path stagingDir = MRApps.getStagingAreaDir(conf, user);
       FileSystem fs = getFileSystem(conf);
+
       boolean stagingExists = fs.exists(stagingDir);
       Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
       boolean commitStarted = fs.exists(startCommitFile);
@@ -344,17 +346,24 @@ public class MRAppMaster extends CompositeService {
               "before it crashed. Not retrying.";
           forcedState = JobStateInternal.FAILED;
         } else {
-          //The commit is still pending, commit error
-          shutDownMessage =
-              "Job commit from a prior MRAppMaster attempt is " +
-              "potentially in progress. Preventing multiple commit executions";
-          forcedState = JobStateInternal.ERROR;
+          if (isCommitJobRepeatable()) {
+            // cleanup previous half done commits if committer supports
+            // repeatable job commit.
+            errorHappenedShutDown = false;
+            cleanupInterruptedCommit(conf, fs, startCommitFile);
+          } else {
+            //The commit is still pending, commit error
+            shutDownMessage =
+                "Job commit from a prior MRAppMaster attempt is " +
+                "potentially in progress. Preventing multiple commit executions";
+            forcedState = JobStateInternal.ERROR;
+          }
         }
       }
     } catch (IOException e) {
       throw new YarnRuntimeException("Error while initializing", e);
     }
-    
+
     if (errorHappenedShutDown) {
       NoopEventHandler eater = new NoopEventHandler();
       //We do not have a JobEventDispatcher in this path
@@ -400,7 +409,6 @@ public class MRAppMaster extends CompositeService {
         addIfService(cpHist);
       }
     } else {
-      committer = createOutputCommitter(conf);
 
       //service to handle requests from JobClient
       clientService = createClientService(context);
@@ -486,6 +494,38 @@ public class MRAppMaster extends CompositeService {
     return new AsyncDispatcher();
   }
 
+  private boolean isCommitJobRepeatable() throws IOException {
+    boolean isRepeatable = false;
+    Configuration conf = getConfig();
+    if (committer != null) {
+      final JobContext jobContext = getJobContextFromConf(conf);
+
+      isRepeatable = callWithJobClassLoader(conf,
+          new ExceptionAction<Boolean>() {
+            public Boolean call(Configuration conf) throws IOException {
+              return committer.isCommitJobRepeatable(jobContext);
+            }
+          });
+    }
+    return isRepeatable;
+  }
+
+  private JobContext getJobContextFromConf(Configuration conf) {
+    if (newApiCommitter) {
+      return new JobContextImpl(conf, TypeConverter.fromYarn(getJobId()));
+    } else {
+      return new org.apache.hadoop.mapred.JobContextImpl(
+          new JobConf(conf), TypeConverter.fromYarn(getJobId()));
+    }
+  }
+
+  private void cleanupInterruptedCommit(Configuration conf,
+      FileSystem fs, Path startCommitFile) throws IOException {
+    LOG.info("Delete startJobCommitFile in case commit is not finished as " +
+        "successful or failed.");
+    fs.delete(startCommitFile, false);
+  }
+
   private OutputCommitter createOutputCommitter(Configuration conf) {
     return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
       public OutputCommitter call(Configuration conf) {
@@ -1212,14 +1252,7 @@ public class MRAppMaster extends CompositeService {
     boolean isSupported = false;
     Configuration conf = getConfig();
     if (committer != null) {
-      final JobContext _jobContext;
-      if (newApiCommitter) {
-         _jobContext = new JobContextImpl(
-            conf, TypeConverter.fromYarn(getJobId()));
-      } else {
-          _jobContext = new org.apache.hadoop.mapred.JobContextImpl(
-                new JobConf(conf), TypeConverter.fromYarn(getJobId()));
-      }
+      final JobContext _jobContext = getJobContextFromConf(conf);
       isSupported = callWithJobClassLoader(conf,
           new ExceptionAction<Boolean>() {
             public Boolean call(Configuration conf) throws IOException {

+ 18 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java

@@ -261,27 +261,38 @@ public class CommitterEventHandler extends AbstractService
       }
     }
 
-    private void touchz(Path p) throws IOException {
-      fs.create(p, false).close();
+    // If job commit is repeatable, then we should allow
+    // startCommitFile/endCommitSuccessFile/endCommitFailureFile to be written
+    // by other AM before.
+    private void touchz(Path p, boolean overwrite) throws IOException {
+      fs.create(p, overwrite).close();
     }
-    
+
     @SuppressWarnings("unchecked")
     protected void handleJobCommit(CommitterJobCommitEvent event) {
+      boolean commitJobIsRepeatable = false;
       try {
-        touchz(startCommitFile);
+        commitJobIsRepeatable = committer.isCommitJobRepeatable(
+            event.getJobContext());
+      } catch (IOException e) {
+        LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
+      }
+
+      try {
+        touchz(startCommitFile, commitJobIsRepeatable);
         jobCommitStarted();
         waitForValidCommitWindow();
         committer.commitJob(event.getJobContext());
-        touchz(endCommitSuccessFile);
+        touchz(endCommitSuccessFile, commitJobIsRepeatable);
         context.getEventHandler().handle(
             new JobCommitCompletedEvent(event.getJobID()));
       } catch (Exception e) {
+        LOG.error("Could not commit job", e);
         try {
-          touchz(endCommitFailureFile);
+          touchz(endCommitFailureFile, commitJobIsRepeatable);
         } catch (Exception e2) {
           LOG.error("could not create failure file.", e2);
         }
-        LOG.error("Could not commit job", e);
         context.getEventHandler().handle(
             new JobCommitFailedEvent(event.getJobID(),
                 StringUtils.stringifyException(e)));

+ 6 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java

@@ -182,13 +182,18 @@ public class FileOutputCommitter extends OutputCommitter {
   throws IOException {
     return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context));
   }
-  
+
   @Override
   @Deprecated
   public boolean isRecoverySupported() {
     return true;
   }
 
+  @Override
+  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
+    return getWrapped(context).isCommitJobRepeatable(context);
+  }
+
   @Override
   public boolean isRecoverySupported(JobContext context) throws IOException {
     return getWrapped(context).isRecoverySupported(context);

+ 33 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java

@@ -192,7 +192,7 @@ public abstract class OutputCommitter
    * 
    * If task output recovery is supported, job restart can be done more
    * efficiently.
-   * 
+   *
    * @param jobContext
    *          Context of the job whose output is being written.
    * @return <code>true</code> if task output recovery is supported,
@@ -204,6 +204,38 @@ public abstract class OutputCommitter
     return isRecoverySupported();
   }
 
+  /**
+   * Returns true if an in-progress job commit can be retried. If the MR AM is
+   * re-run then it will check this value to determine if it can retry an
+   * in-progress commit that was started by a previous version.
+   * Note that in rare scenarios, the previous AM version might still be running
+   * at that time, due to system anomalies. Hence if this method returns true
+   * then the retry commit operation should be able to run concurrently with
+   * the previous operation.
+   *
+   * If repeatable job commit is supported, job restart can tolerate previous
+   * AM failures during job commit.
+   *
+   * By default, it is not supported. Extended classes (like:
+   * FileOutputCommitter) should explicitly override it if provide support.
+   *
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return <code>true</code> repeatable job commit is supported,
+   *         <code>false</code> otherwise
+   * @throws IOException
+   */
+  public boolean isCommitJobRepeatable(JobContext jobContext) throws
+      IOException {
+    return false;
+  }
+
+  @Override
+  public boolean isCommitJobRepeatable(org.apache.hadoop.mapreduce.JobContext
+      jobContext) throws IOException {
+    return isCommitJobRepeatable((JobContext) jobContext);
+  }
+
   /**
    * Recover the task output. 
    * 

+ 26 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java

@@ -189,6 +189,32 @@ public abstract class OutputCommitter {
     return false;
   }
 
+  /**
+   * Returns true if an in-progress job commit can be retried. If the MR AM is
+   * re-run then it will check this value to determine if it can retry an
+   * in-progress commit that was started by a previous version.
+   * Note that in rare scenarios, the previous AM version might still be running
+   * at that time, due to system anomalies. Hence if this method returns true
+   * then the retry commit operation should be able to run concurrently with
+   * the previous operation.
+   *
+   * If repeatable job commit is supported, job restart can tolerate previous
+   * AM failures during job commit.
+   *
+   * By default, it is not supported. Extended classes (like:
+   * FileOutputCommitter) should explicitly override it if provide support.
+   *
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return <code>true</code> repeatable job commit is supported,
+   *         <code>false</code> otherwise
+   * @throws IOException
+   */
+  public boolean isCommitJobRepeatable(JobContext jobContext)
+      throws IOException {
+    return false;
+  }
+
   /**
    * Is task output recovery supported for restarting jobs?
    * 

+ 65 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

@@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** An {@link OutputCommitter} that commits files specified 
  * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
  **/
@@ -76,6 +78,13 @@ public class FileOutputCommitter extends OutputCommitter {
   public static final boolean
       FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false;
 
+  // Number of attempts when failure happens in commit job
+  public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS =
+      "mapreduce.fileoutputcommitter.failures.attempts";
+
+  // default value to be 1 to keep consistent with previous behavior
+  public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
+
   private Path outputPath = null;
   private Path workPath = null;
   private final int algorithmVersion;
@@ -340,12 +349,40 @@ public class FileOutputCommitter extends OutputCommitter {
   }
 
   /**
-   * The job has completed so move all committed tasks to the final output dir.
+   * The job has completed, so do works in commitJobInternal().
+   * Could retry on failure if using algorithm 2.
+   * @param context the job's context
+   */
+  public void commitJob(JobContext context) throws IOException {
+    int maxAttemptsOnFailure = isCommitJobRepeatable(context) ?
+        context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
+            FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1;
+    int attempt = 0;
+    boolean jobCommitNotFinished = true;
+    while (jobCommitNotFinished) {
+      try {
+        commitJobInternal(context);
+        jobCommitNotFinished = false;
+      } catch (Exception e) {
+        if (++attempt >= maxAttemptsOnFailure) {
+          throw e;
+        } else {
+          LOG.warn("Exception get thrown in job commit, retry (" + attempt +
+              ") time.", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * The job has completed, so do following commit job, include:
+   * Move all committed tasks to the final output dir (algorithm 1 only).
    * Delete the temporary directory, including all of the work directories.
    * Create a _SUCCESS file to make it as successful.
    * @param context the job's context
    */
-  public void commitJob(JobContext context) throws IOException {
+  @VisibleForTesting
+  protected void commitJobInternal(JobContext context) throws IOException {
     if (hasOutputPath()) {
       Path finalOutput = getOutputPath();
       FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
@@ -377,9 +414,17 @@ public class FileOutputCommitter extends OutputCommitter {
       }
       // True if the job requires output.dir marked on successful job.
       // Note that by default it is set to true.
-      if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
+      if (context.getConfiguration().getBoolean(
+          SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
         Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-        fs.create(markerPath).close();
+        // If job commit is repeatable and previous/another AM could write
+        // mark file already, we need to set overwritten to be true explicitly
+        // in case other FS implementations don't overwritten by default.
+        if (isCommitJobRepeatable(context)) {
+          fs.create(markerPath, true).close();
+        } else {
+          fs.create(markerPath).close();
+        }
       }
     } else {
       LOG.warn("Output Path is null in commitJob()");
@@ -458,7 +503,16 @@ public class FileOutputCommitter extends OutputCommitter {
       Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
       FileSystem fs = pendingJobAttemptsPath
           .getFileSystem(context.getConfiguration());
-      fs.delete(pendingJobAttemptsPath, true);
+      // if job allow repeatable commit and pendingJobAttemptsPath could be
+      // deleted by previous AM, we should tolerate FileNotFoundException in
+      // this case.
+      try {
+        fs.delete(pendingJobAttemptsPath, true);
+      } catch (FileNotFoundException e) {
+        if (!isCommitJobRepeatable(context)) {
+          throw e;
+        }
+      }
     } else {
       LOG.warn("Output Path is null in cleanupJob()");
     }
@@ -594,7 +648,12 @@ public class FileOutputCommitter extends OutputCommitter {
   public boolean isRecoverySupported() {
     return true;
   }
-  
+
+  @Override
+  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
+    return algorithmVersion == 2;
+  }
+
   @Override
   public void recoverTask(TaskAttemptContext context)
       throws IOException {

+ 151 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.URI;
 
 import junit.framework.TestCase;
+import org.junit.Assert;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -202,6 +203,112 @@ public class TestFileOutputCommitter extends TestCase {
     assert(dataFileFound && indexFileFound);
   }
 
+  public void testCommitterWithFailureV1() throws Exception {
+    testCommitterWithFailureInternal(1, 1);
+    testCommitterWithFailureInternal(1, 2);
+  }
+
+  public void testCommitterWithFailureV2() throws Exception {
+    testCommitterWithFailureInternal(2, 1);
+    testCommitterWithFailureInternal(2, 2);
+  }
+
+  private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception {
+    JobConf conf = new JobConf();
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.
+        FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, maxAttempts);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new CommitterWithFailedThenSucceed();
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter =
+        theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
+
+    try {
+      committer.commitJob(jContext);
+      // (1,1), (1,2), (2,1) shouldn't reach to here.
+      if (version == 1 || maxAttempts <= 1) {
+        Assert.fail("Commit successful: wrong behavior for version 1.");
+      }
+    } catch (IOException e) {
+      // (2,2) shouldn't reach to here.
+      if (version == 2 && maxAttempts > 2) {
+        Assert.fail("Commit failed: wrong behavior for version 2.");
+      }
+    }
+
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public void testCommitterWithDuplicatedCommitV1() throws Exception {
+    testCommitterWithDuplicatedCommitInternal(1);
+  }
+
+  public void testCommitterWithDuplicatedCommitV2() throws Exception {
+    testCommitterWithDuplicatedCommitInternal(2);
+  }
+
+  private void testCommitterWithDuplicatedCommitInternal(int version) throws
+      Exception {
+    JobConf conf = new JobConf();
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter =
+        theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
+    committer.commitJob(jContext);
+
+    // validate output
+    validateContent(outDir);
+
+    // commit again
+    try {
+      committer.commitJob(jContext);
+      if (version == 1) {
+        Assert.fail("Duplicate commit successful: wrong behavior " +
+            "for version 1.");
+      }
+    } catch (IOException e) {
+      if (version == 2) {
+        Assert.fail("Duplicate commit failed: wrong behavior for version 2.");
+      }
+    }
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
   private void testCommitterInternal(int version) throws Exception {
     JobConf conf = new JobConf();
     FileOutputFormat.setOutputPath(conf, outDir);
@@ -451,4 +558,48 @@ public class TestFileOutputCommitter extends TestCase {
     return contents;
   }
 
+  /**
+   * The class provides a overrided implementation of commitJobInternal which
+   * causes the commit failed for the first time then succeed.
+   */
+  public static class CommitterWithFailedThenSucceed extends
+      FileOutputCommitter {
+    boolean firstTimeFail = true;
+
+    public CommitterWithFailedThenSucceed() throws IOException {
+      super();
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      JobConf conf = context.getJobConf();
+      org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped =
+          new CommitterFailedFirst(FileOutputFormat.getOutputPath(conf),
+              context);
+      wrapped.commitJob(context);
+    }
+  }
+
+  public static class CommitterFailedFirst extends
+      org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter {
+    boolean firstTimeFail = true;
+
+    public CommitterFailedFirst(Path outputPath,
+        JobContext context) throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    protected void commitJobInternal(org.apache.hadoop.mapreduce.JobContext
+        context) throws IOException {
+      super.commitJobInternal(context);
+      if (firstTimeFail) {
+        firstTimeFail = false;
+        throw new IOException();
+      } else {
+        // succeed then, nothing to do
+      }
+    }
+  }
+
 }

+ 196 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.lib.output;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.Callable;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -283,6 +285,174 @@ public class TestFileOutputCommitter extends TestCase {
   public void testCommitterV2() throws Exception {
     testCommitterInternal(2);
   }
+  
+  public void testCommitterWithDuplicatedCommitV1() throws Exception {
+    testCommitterWithDuplicatedCommitInternal(1);
+  }
+
+  public void testCommitterWithDuplicatedCommitV2() throws Exception {
+    testCommitterWithDuplicatedCommitInternal(2);
+  }
+
+  private void testCommitterWithDuplicatedCommitInternal(int version) throws
+      Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+
+    // validate output
+    validateContent(outDir);
+
+    // commit job again on a successful commit job.
+    try {
+      committer.commitJob(jContext);
+      if (version == 1) {
+        Assert.fail("Duplicate commit success: wrong behavior for version 1.");
+      }
+    } catch (IOException e) {
+      if (version == 2) {
+        Assert.fail("Duplicate commit failed: wrong behavior for version 2.");
+      }
+    }
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public void testCommitterWithFailureV1() throws Exception {
+    testCommitterWithFailureInternal(1, 1);
+    testCommitterWithFailureInternal(1, 2);
+  }
+
+  public void testCommitterWithFailureV2() throws Exception {
+    testCommitterWithFailureInternal(2, 1);
+    testCommitterWithFailureInternal(2, 2);
+  }
+
+  private void testCommitterWithFailureInternal(int version, int maxAttempts)
+      throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
+        maxAttempts);
+
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir,
+        tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+
+    try {
+      committer.commitJob(jContext);
+      // (1,1), (1,2), (2,1) shouldn't reach to here.
+      if (version == 1 || maxAttempts <= 1) {
+        Assert.fail("Commit successful: wrong behavior for version 1.");
+      }
+    } catch (IOException e) {
+      // (2,2) shouldn't reach to here.
+      if (version == 2 && maxAttempts > 2) {
+        Assert.fail("Commit failed: wrong behavior for version 2.");
+      }
+    }
+
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public void testCommitterRepeatableV1() throws Exception {
+    testCommitterRetryInternal(1);
+  }
+
+  public void testCommitterRepeatableV2() throws Exception {
+    testCommitterRetryInternal(2);
+  }
+
+  // retry committer for 2 times.
+  private void testCommitterRetryInternal(int version)
+      throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
+    // only attempt for 1 time.
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
+        1);
+
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir,
+        tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+
+    try {
+      committer.commitJob(jContext);
+      Assert.fail("Commit successful: wrong behavior for the first time " +
+          "commit.");
+    } catch (IOException e) {
+      // commit again.
+      try {
+        committer.commitJob(jContext);
+        // version 1 shouldn't reach to here.
+        if (version == 1) {
+          Assert.fail("Commit successful after retry: wrong behavior for " +
+              "version 1.");
+        }
+      } catch (FileNotFoundException ex) {
+        if (version == 2) {
+          Assert.fail("Commit failed after retry: wrong behavior for" +
+              " version 2.");
+        }
+        assertTrue(ex.getMessage().contains(committer.getJobAttemptPath(
+            jContext).toString() + " does not exist"));
+      }
+    }
+
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
 
   private void testMapFileOutputCommitterInternal(int version)
       throws Exception {
@@ -292,7 +462,7 @@ public class TestFileOutputCommitter extends TestCase {
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
     conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
         version);
-    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());    
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
 
@@ -584,4 +754,29 @@ public class TestFileOutputCommitter extends TestCase {
     return contents;
   }
 
+  /**
+   * The class provides a overrided implementation of commitJobInternal which
+   * causes the commit failed for the first time then succeed.
+   */
+  public static class CommitterWithFailedThenSucceed extends
+      FileOutputCommitter {
+    boolean firstTimeFail = true;
+
+    public CommitterWithFailedThenSucceed(Path outputPath,
+        JobContext context) throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    protected void commitJobInternal(JobContext context) throws IOException {
+      super.commitJobInternal(context);
+      if (firstTimeFail) {
+        firstTimeFail = false;
+        throw new IOException();
+      } else {
+        // succeed then, nothing to do
+      }
+    }
+  }
+
 }