Parcourir la source

HADOOP-18793. S3A StagingCommitter does not clean up staging-uploads directory (#5818)

Contributed by Harunobu Daikoku
Harunobu Daikoku il y a 2 ans
Parent
commit
f02fa6683d

+ 13 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java

@@ -227,6 +227,19 @@ public final class Paths {
         MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
         MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
   }
   }
 
 
+  /**
+   * Build a qualified parent path for the temporary multipart upload commit
+   * directory built by {@link #getMultipartUploadCommitsDirectory(Configuration, String)}.
+   * @param conf configuration defining default FS.
+   * @param uuid uuid of job
+   * @return a path which can be used for temporary work
+   * @throws IOException on an IO failure.
+   */
+  public static Path getStagingUploadsParentDirectory(Configuration conf,
+      String uuid) throws IOException {
+    return getMultipartUploadCommitsDirectory(conf, uuid).getParent();
+  }
+
   /**
   /**
    * Build a qualified temporary path for the multipart upload commit
    * Build a qualified temporary path for the multipart upload commit
    * information in the cluster filesystem.
    * information in the cluster filesystem.

+ 21 - 11
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java

@@ -501,8 +501,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
 
 
   /**
   /**
    * Staging committer cleanup includes calling wrapped committer's
    * Staging committer cleanup includes calling wrapped committer's
-   * cleanup method, and removing all destination paths in the final
-   * filesystem.
+   * cleanup method, and removing staging uploads path and all
+   * destination paths in the final filesystem.
    * @param commitContext commit context
    * @param commitContext commit context
    * @param suppressExceptions should exceptions be suppressed?
    * @param suppressExceptions should exceptions be suppressed?
    * @throws IOException IO failures if exceptions are not suppressed.
    * @throws IOException IO failures if exceptions are not suppressed.
@@ -515,6 +515,9 @@ public class StagingCommitter extends AbstractS3ACommitter {
     maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
     maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
         () -> wrappedCommitter.cleanupJob(
         () -> wrappedCommitter.cleanupJob(
             commitContext.getJobContext()));
             commitContext.getJobContext()));
+    maybeIgnore(suppressExceptions, "Delete staging uploads path",
+        () -> deleteStagingUploadsParentDirectory(
+            commitContext.getJobContext()));
     maybeIgnore(suppressExceptions, "Delete destination paths",
     maybeIgnore(suppressExceptions, "Delete destination paths",
         () -> deleteDestinationPaths(
         () -> deleteDestinationPaths(
             commitContext.getJobContext()));
             commitContext.getJobContext()));
@@ -543,11 +546,26 @@ public class StagingCommitter extends AbstractS3ACommitter {
     }
     }
   }
   }
 
 
+  /**
+   * Delete the multipart upload staging directory.
+   * @param context job context
+   * @throws IOException IO failure
+   */
+  protected void deleteStagingUploadsParentDirectory(JobContext context)
+          throws IOException {
+    Path stagingUploadsPath = Paths.getStagingUploadsParentDirectory(
+            context.getConfiguration(), getUUID());
+    ignoreIOExceptions(LOG,
+        "Deleting staging uploads path", stagingUploadsPath.toString(),
+        () -> deleteWithWarning(
+            stagingUploadsPath.getFileSystem(getConf()),
+            stagingUploadsPath,
+            true));
+  }
 
 
   /**
   /**
    * Delete the working paths of a job.
    * Delete the working paths of a job.
    * <ol>
    * <ol>
-   *   <li>The job attempt path</li>
    *   <li>{@code $dest/__temporary}</li>
    *   <li>{@code $dest/__temporary}</li>
    *   <li>the local working directory for staged files</li>
    *   <li>the local working directory for staged files</li>
    * </ol>
    * </ol>
@@ -556,14 +574,6 @@ public class StagingCommitter extends AbstractS3ACommitter {
    * @throws IOException IO failure
    * @throws IOException IO failure
    */
    */
   protected void deleteDestinationPaths(JobContext context) throws IOException {
   protected void deleteDestinationPaths(JobContext context) throws IOException {
-    Path attemptPath = getJobAttemptPath(context);
-    ignoreIOExceptions(LOG,
-        "Deleting Job attempt Path", attemptPath.toString(),
-        () -> deleteWithWarning(
-            getJobAttemptFileSystem(context),
-            attemptPath,
-            true));
-
     // delete the __temporary directory. This will cause problems
     // delete the __temporary directory. This will cause problems
     // if there is >1 task targeting the same dest dir
     // if there is >1 task targeting the same dest dir
     deleteWithWarning(getDestFS(),
     deleteWithWarning(getDestFS(),

+ 24 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java

@@ -403,6 +403,30 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
       this.committer = committer;
       this.committer = committer;
       conf = job.getConfiguration();
       conf = job.getConfiguration();
     }
     }
+
+    public Job getJob() {
+      return job;
+    }
+
+    public JobContext getJContext() {
+      return jContext;
+    }
+
+    public TaskAttemptContext getTContext() {
+      return tContext;
+    }
+
+    public AbstractS3ACommitter getCommitter() {
+      return committer;
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+
+    public Path getWrittenTextPath() {
+      return writtenTextPath;
+    }
   }
   }
 
 
   /**
   /**

+ 70 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.UUID;
 
 
+import org.junit.Test;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
@@ -141,6 +143,74 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
     assertEquals("file", wd.toUri().getScheme());
     assertEquals("file", wd.toUri().getScheme());
   }
   }
 
 
+  @Test
+  public void testStagingUploadsDirectoryCleanedUp() throws Exception {
+    describe("Assert that the staging uploads directory is cleaned up after successful commit");
+    JobData jobData = startJob(false);
+    JobContext jContext = jobData.getJContext();
+    TaskAttemptContext tContext = jobData.getTContext();
+    StagingCommitter committer = (StagingCommitter) jobData.getCommitter();
+
+    Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
+            jContext.getConfiguration(),
+            committer.getUUID());
+
+    ContractTestUtils.assertPathExists(
+            stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
+            "staging uploads path must exist after setupJob",
+            stagingUploadsDir
+    );
+
+    // write output
+    writeTextOutput(tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+
+    commitJob(committer, jContext);
+
+    ContractTestUtils.assertPathDoesNotExist(
+            stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
+            "staging uploads path must not exist after commitJob",
+            stagingUploadsDir
+    );
+  }
+
+  @Test
+  public void testStagingUploadsDirectoryCleanedUpWithFailure() throws Exception {
+    describe("Assert that the staging uploads directory is cleaned up after failed commit");
+    JobData jobData = startJob(new FailingCommitterFactory(), false);
+    JobContext jContext = jobData.getJContext();
+    TaskAttemptContext tContext = jobData.getTContext();
+    StagingCommitter committer = (StagingCommitter) jobData.getCommitter();
+
+    Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
+            jContext.getConfiguration(),
+            committer.getUUID());
+
+    ContractTestUtils.assertPathExists(
+            stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
+            "staging uploads path must exist after setupJob",
+            stagingUploadsDir
+    );
+
+    // do commit
+    committer.commitTask(tContext);
+
+    // now fail job
+    expectSimulatedFailureOnJobCommit(jContext, committer);
+
+    commitJob(committer, jContext);
+
+    expectJobCommitToFail(jContext, committer);
+
+    ContractTestUtils.assertPathDoesNotExist(
+            stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
+            "staging uploads path must not exist after commitJob",
+            stagingUploadsDir
+    );
+  }
+
   /**
   /**
    * The class provides a overridden implementation of commitJobInternal which
    * The class provides a overridden implementation of commitJobInternal which
    * causes the commit failed for the first time then succeed.
    * causes the commit failed for the first time then succeed.