瀏覽代碼

MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup failure during commitJob. (Junping Du via wangda)

Wangda Tan 9 年之前
父節點
當前提交
372ad270a0

+ 49 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

@@ -64,9 +64,23 @@ public class FileOutputCommitter extends OutputCommitter {
   public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
   public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
       "mapreduce.fileoutputcommitter.algorithm.version";
       "mapreduce.fileoutputcommitter.algorithm.version";
   public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2;
   public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2;
+  // Skip cleanup _temporary folders under job's output directory
+  public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED =
+      "mapreduce.fileoutputcommitter.cleanup.skipped";
+  public static final boolean
+      FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT = false;
+
+  // Ignore exceptions in cleanup _temporary folder under job's output directory
+  public static final String FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED =
+      "mapreduce.fileoutputcommitter.cleanup-failures.ignored";
+  public static final boolean
+      FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false;
+
   private Path outputPath = null;
   private Path outputPath = null;
   private Path workPath = null;
   private Path workPath = null;
   private final int algorithmVersion;
   private final int algorithmVersion;
+  private final boolean skipCleanup;
+  private final boolean ignoreCleanupFailures;
 
 
   /**
   /**
    * Create a file output committer
    * Create a file output committer
@@ -101,6 +115,21 @@ public class FileOutputCommitter extends OutputCommitter {
     if (algorithmVersion != 1 && algorithmVersion != 2) {
     if (algorithmVersion != 1 && algorithmVersion != 2) {
       throw new IOException("Only 1 or 2 algorithm version is supported");
       throw new IOException("Only 1 or 2 algorithm version is supported");
     }
     }
+
+    // if skip cleanup
+    skipCleanup = conf.getBoolean(
+        FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED,
+        FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT);
+
+    // if ignore failures in cleanup
+    ignoreCleanupFailures = conf.getBoolean(
+        FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED,
+        FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT);
+
+    LOG.info("FileOutputCommitter skip cleanup _temporary folders under " +
+        "output directory:" + skipCleanup + ", ignore cleanup failures: " +
+        ignoreCleanupFailures);
+
     if (outputPath != null) {
     if (outputPath != null) {
       FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
       FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
       this.outputPath = fs.makeQualified(outputPath);
       this.outputPath = fs.makeQualified(outputPath);
@@ -327,8 +356,25 @@ public class FileOutputCommitter extends OutputCommitter {
         }
         }
       }
       }
 
 
-      // delete the _temporary folder and create a _done file in the o/p folder
-      cleanupJob(context);
+      if (skipCleanup) {
+        LOG.info("Skip cleanup the _temporary folders under job's output " +
+            "directory in commitJob.");
+      } else {
+        // delete the _temporary folder and create a _done file in the o/p
+        // folder
+        try {
+          cleanupJob(context);
+        } catch (IOException e) {
+          if (ignoreCleanupFailures) {
+            // swallow exceptions in cleanup as user configure to make sure
+            // commitJob could be success even when cleanup get failure.
+            LOG.error("Error in cleanup job, manually cleanup is needed.", e);
+          } else {
+            // throw back exception to fail commitJob.
+            throw e;
+          }
+        }
+      }
       // True if the job requires output.dir marked on successful job.
       // True if the job requires output.dir marked on successful job.
       // Note that by default it is set to true.
       // Note that by default it is set to true.
       if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
       if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
@@ -600,4 +646,4 @@ public class FileOutputCommitter extends OutputCommitter {
       LOG.warn("Output Path is null in recoverTask()");
       LOG.warn("Output Path is null in recoverTask()");
     }
     }
   }
   }
-}
+}

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -451,6 +451,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2597. MiniYARNCluster should propagate reason for AHS not starting.
     YARN-2597. MiniYARNCluster should propagate reason for AHS not starting.
     (stevel)
     (stevel)
 
 
+    MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup
+    failure during commitJob. (Junping Du via wangda)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not
     YARN-3339. TestDockerContainerExecutor should pull a single image and not