Просмотр исходного кода

MAPREDUCE-3982. Fixed FileOutputCommitter to not err out for an 'empty-job' whose tasks don't write any outputs. Contributed by Robert Joseph Evans.
svn merge --ignore-ancestry -c 1299047 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.2@1299051 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 13 лет назад
Родитель
Сommit
8436b524fc

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -160,6 +160,9 @@ Release 0.23.2 - UNRELEASED
     MAPREDUCE-3975. Default value not set for Configuration parameter
     mapreduce.job.local.dir (Eric Payne via bobby)
 
+    MAPREDUCE-3982. Fixed FileOutputCommitter to not err out for an 'empty-job'
+    whose tasks don't write any outputs. (Robert Joseph Evans via vinodkv)
+
 Release 0.23.1 - 2012-02-17
 
   INCOMPATIBLE CHANGES

+ 4 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

@@ -278,11 +278,11 @@ public class FileOutputCommitter extends OutputCommitter {
    */
   public void setupJob(JobContext context) throws IOException {
     if (hasOutputPath()) {
-      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
-      FileSystem fs = pendingJobAttemptsPath.getFileSystem(
+      Path jobAttemptPath = getJobAttemptPath(context);
+      FileSystem fs = jobAttemptPath.getFileSystem(
           context.getConfiguration());
-      if (!fs.mkdirs(pendingJobAttemptsPath)) {
-        LOG.error("Mkdirs failed to create " + pendingJobAttemptsPath);
+      if (!fs.mkdirs(jobAttemptPath)) {
+        LOG.error("Mkdirs failed to create " + jobAttemptPath);
       }
     } else {
       LOG.warn("Output Path is null in setupJob()");

+ 22 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java

@@ -122,6 +122,28 @@ public class TestFileOutputCommitter extends TestCase {
     assertEquals(output, expectedOutput.toString());
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
+  
+  public void testEmptyOutput() throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // Do not write any output
+
+    // do commit
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+    
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
 
   @SuppressWarnings("unchecked")
   public void testAbort() throws IOException, InterruptedException {