ソースを参照

HADOOP-18582. skip unnecessary cleanup logic in distcp (#5251)

Co-authored-by: 万康 <mingge@xiaohongshu.com>
Reviewed-by: Steve Loughran <stevel@apache.org>
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
Signed-off-by: Chris Nauroth <cnauroth@apache.org>
kevin wan 2 年 前
コミット
3b7b79b37a

+ 11 - 2
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java

@@ -152,9 +152,18 @@ public class CopyCommitter extends FileOutputCommitter {
   }
   }
 
 
   private void cleanupTempFiles(JobContext context) {
   private void cleanupTempFiles(JobContext context) {
-    try {
-      Configuration conf = context.getConfiguration();
+    Configuration conf = context.getConfiguration();
+
+    final boolean directWrite = conf.getBoolean(
+        DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
+    final boolean append = conf.getBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), false);
+    final boolean useTempTarget = !append && !directWrite;
+    if (!useTempTarget) {
+      return;
+    }
 
 
+    try {
       Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
       Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
       FileSystem targetFS = targetWorkPath.getFileSystem(conf);
       FileSystem targetFS = targetWorkPath.getFileSystem(conf);
 
 

+ 71 - 0
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.tools.mapred;
 package org.apache.hadoop.tools.mapred;
 
 
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -580,6 +581,76 @@ public class TestCopyCommitter {
     }
     }
   }
   }
 
 
+  @Test
+  public void testCommitWithCleanupTempFiles() throws IOException {
+    testCommitWithCleanup(true, false);
+    testCommitWithCleanup(false, true);
+    testCommitWithCleanup(true, true);
+    testCommitWithCleanup(false, false);
+  }
+
+  private void testCommitWithCleanup(boolean append, boolean directWrite)throws IOException {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobID jobID = taskAttemptContext.getTaskAttemptID().getJobID();
+    JobContext jobContext = new JobContextImpl(
+        taskAttemptContext.getConfiguration(),
+        jobID);
+    Configuration conf = jobContext.getConfiguration();
+
+    String sourceBase;
+    String targetBase;
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(conf);
+      sourceBase = "/tmp1/" + rand.nextLong();
+      targetBase = "/tmp1/" + rand.nextLong();
+
+      DistCpOptions options = new DistCpOptions.Builder(
+          Collections.singletonList(new Path(sourceBase)),
+          new Path("/out"))
+          .withAppend(append)
+          .withSyncFolder(true)
+          .withDirectWrite(directWrite)
+          .build();
+      options.appendToConf(conf);
+
+      DistCpContext context = new DistCpContext(options);
+      context.setTargetPathExists(false);
+
+
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+      Path tempFilePath = getTempFile(targetBase, taskAttemptContext);
+      createDirectory(fs, tempFilePath);
+
+      OutputCommitter committer = new CopyCommitter(
+          null, taskAttemptContext);
+      committer.commitJob(jobContext);
+
+      if (append || directWrite) {
+        ContractTestUtils.assertPathExists(fs, "Temp files should not be cleanup with append or direct option",
+            tempFilePath);
+      } else {
+        ContractTestUtils.assertPathDoesNotExist(
+            fs,
+            "Temp files should be clean up without append or direct option",
+            tempFilePath);
+      }
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp1");
+      TestDistCpUtils.delete(fs, "/meta");
+    }
+  }
+
+  private Path getTempFile(String targetWorkPath, TaskAttemptContext taskAttemptContext) {
+    Path tempFile = new Path(targetWorkPath, ".distcp.tmp." +
+        taskAttemptContext.getTaskAttemptID().toString() +
+        "." + System.currentTimeMillis());
+    LOG.info("Creating temp file: {}", tempFile);
+    return tempFile;
+  }
+
   /**
   /**
    * Create a source file and its DistCp working files with different checksum
    * Create a source file and its DistCp working files with different checksum
    * to test the checksum validation for copying blocks in parallel.
    * to test the checksum validation for copying blocks in parallel.