Преглед изворни кода

HADOOP-18568. magic committer optional cleanup

Closes #7693

Co-authored-by: Chris Nauroth <cnauroth@apache.org>
Signed-off-by: Chris Nauroth <cnauroth@apache.org>
Hossein Torabi пре 2 недеља
родитељ
комит
7acb89cb15

+ 12 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java

@@ -258,6 +258,18 @@ public final class CommitConstants {
   public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
       false;
 
+  /**
+   * Should Magic committer cleanup all the staging dirs.
+   */
+  public static final String FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED =
+      "fs.s3a.committer.magic.cleanup.enabled";
+
+  /**
+   * Default value for {@link #FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED}: {@value}.
+   */
+  public static final boolean FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED_DEFAULT =
+      true;
+
   /**
    * Path  in the cluster filesystem for temporary data: {@value}.
    * This is for HDFS, not the local filesystem.

+ 12 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java

@@ -61,4 +61,16 @@ public final class MagicCommitTrackerUtils {
         CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
         CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT);
   }
+
+  /**
+   * Is cleanup of magic committer staging dirs enabled.
+   * @param conf Configuration
+   * @return true if cleanup of staging dir is enabled.
+   */
+  public static boolean isCleanupMagicCommitterEnabled(
+      Configuration conf) {
+    return conf.getBoolean(
+        CommitConstants.FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED,
+        CommitConstants.FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED_DEFAULT);
+  }
 }

+ 13 - 10
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java

@@ -50,6 +50,7 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
+import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isCleanupMagicCommitterEnabled;
 import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
 
@@ -131,16 +132,18 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
    * Delete the magic directory.
    */
   public void cleanupStagingDirs() {
-    final Path out = getOutputPath();
-    Path path = getMagicJobPath(getUUID(), out);
-    try(DurationInfo ignored = new DurationInfo(LOG, true,
-        "Deleting magic directory %s", path)) {
-      Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),
-          () -> deleteWithWarning(getDestFS(), path, true));
-      // and the job temp directory with manifests
-      Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(),
-          () -> deleteWithWarning(getDestFS(),
-              new Path(out, TEMP_DATA), true));
+    if (isCleanupMagicCommitterEnabled(getConf())) {
+      final Path out = getOutputPath();
+      Path path = getMagicJobPath(getUUID(), out);
+      try(DurationInfo ignored = new DurationInfo(LOG, true,
+              "Deleting magic directory %s", path)) {
+        Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),
+                () -> deleteWithWarning(getDestFS(), path, true));
+        // and the job temp directory with manifests
+        Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(),
+                () -> deleteWithWarning(getDestFS(),
+                        new Path(out, TEMP_DATA), true));
+      }
     }
   }
 

+ 18 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md

@@ -558,6 +558,7 @@ The table below provides a summary of each option.
 | `fs.s3a.committer.threads` | Number of threads in committers for parallel operations on files.| -4 |
 | `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` |
 | `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` |
+| `fs.s3a.committer.magic.cleanup.enabled` | Cleanup the magic path after the job is committed. | `true` |
 
 The examples below shows how these options can be configured in XML.
 
@@ -1058,3 +1059,20 @@ one of the following conditions are met
 1. The committer is being used in spark, and the version of spark being used does not
    set the `spark.sql.sources.writeJobUUID` property.
    Either upgrade to a new spark release, or set `fs.s3a.committer.generate.uuid` to true.
+
+### Long Job Completion Time Due to Magic Committer Cleanup
+When using the S3A Magic Committer in large Spark or MapReduce jobs, job completion can be significantly delayed
+due to the cleanup of temporary files (such as those under the `__magic` directory).
+This happens because deleting many small files in S3 is a slow and expensive operation, especially at scale.
+In some cases, the cleanup phase alone can take several minutes or more — even after all data has already been written.
+
+To reduce this overhead, Hadoop 3.4.2+ introduced a configuration option in
+[HADOOP-18568](https://issues.apache.org/jira/browse/HADOOP-18568) that allows users to disable this automatic cleanup
+and use lifecycle policies instead to clean up the temporary files.
+#### Configuration
+```xml
+<property>
+  <name>fs.s3a.committer.magic.cleanup.enabled</name>
+  <value>false</value>
+</property>
+```

+ 28 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java

@@ -214,6 +214,34 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
         .contains(ta0);
   }
 
+  /**
+   * Verify that the magic committer cleanup
+   */
+  @Test
+  public void testCommitterCleanup() throws Throwable {
+    describe("Committer cleanup enabled. hence it should delete the task attempt path after commit");
+    JobData jobData = startJob(true);
+    JobContext jContext = jobData.getJContext();
+    TaskAttemptContext tContext = jobData.getTContext();
+    AbstractS3ACommitter committer = jobData.getCommitter();
+
+    commit(committer, jContext, tContext);
+    assertJobAttemptPathDoesNotExist(committer, jContext);
+
+    describe("Committer cleanup is disabled. hence it should not delete the task attempt path after commit");
+    JobData jobData2 = startJob(true);
+    JobContext jContext2 = jobData2.getJContext();
+    TaskAttemptContext tContext2 = jobData2.getTContext();
+    AbstractS3ACommitter committer2 = jobData2.getCommitter();
+
+    committer2.getConf().setBoolean(FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED, false);
+
+
+    commit(committer2, jContext2, tContext2);
+    assertJobAttemptPathExists(committer2, jContext2);
+  }
+
+
   /**
    * The class provides a overridden implementation of commitJobInternal which
    * causes the commit failed for the first time then succeed.