瀏覽代碼

MAPREDUCE-6607. Enable regex pattern matching when mapreduce.task.files.preserve.filepattern is set. Contributed by Kai Sasaki.

(cherry picked from commit a59f30272dd2a8901a41c5c4300ce9d196e62606)
Akira Ajisaka 9 年之前
父節點
當前提交
4422beab72

+ 26 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Paths;
 import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -34,6 +35,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
@@ -549,9 +552,27 @@ public class MRAppMaster extends CompositeService {
     });
   }
 
-  protected boolean keepJobFiles(JobConf conf) {
-    return (conf.getKeepTaskFilesPattern() != null || conf
-        .getKeepFailedTaskFiles());
+  private boolean isJobNamePatternMatch(JobConf conf, String jobTempDir) {
+    // Matched staging files should be preserved after job is finished.
+    if (conf.getKeepTaskFilesPattern() != null && jobTempDir != null) {
+      String jobFileName = Paths.get(jobTempDir).getFileName().toString();
+      Pattern pattern = Pattern.compile(conf.getKeepTaskFilesPattern());
+      Matcher matcher = pattern.matcher(jobFileName);
+      return matcher.find();
+    } else {
+      return false;
+    }
+  }
+
+  private boolean isKeepFailedTaskFiles(JobConf conf) {
+    // TODO: Decide which failed task files that should
+    // be kept are in application log directory.
+    return conf.getKeepFailedTaskFiles();
+  }
+
+  protected boolean keepJobFiles(JobConf conf, String jobTempDir) {
+    return isJobNamePatternMatch(conf, jobTempDir)
+            || isKeepFailedTaskFiles(conf);
   }
   
   /**
@@ -574,10 +595,10 @@ public class MRAppMaster extends CompositeService {
    */
   public void cleanupStagingDir() throws IOException {
     /* make sure we clean the staging files */
-    String jobTempDir = null;
+    String jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
     FileSystem fs = getFileSystem(getConfig());
     try {
-      if (!keepJobFiles(new JobConf(getConfig()))) {
+      if (!keepJobFiles(new JobConf(getConfig()), jobTempDir)) {
         jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
         if (jobTempDir == null) {
           LOG.warn("Job Staging directory is null");

+ 128 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -76,6 +77,11 @@ import org.junit.Test;
    private final static RecordFactory recordFactory = RecordFactoryProvider.
        getRecordFactory(null);
 
+  @After
+  public void tearDown() {
+    conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, false);
+  }
+
    @Test
    public void testDeletionofStagingOnUnregistrationFailure()
        throws IOException {
@@ -245,6 +251,128 @@ import org.junit.Test;
      verify(fs).delete(stagingJobPath, true);
    }
 
+  @Test
+  public void testByPreserveFailedStaging() throws IOException {
+    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+    // TODO: Decide which failed task files that should
+    // be kept are in application log directory.
+    // Currently all files are not deleted from staging dir.
+    conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
+    fs = mock(FileSystem.class);
+    when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+    //Staging Dir exists
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+    when(fs.exists(stagingDir)).thenReturn(true);
+    ApplicationId appId
+        = ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    ApplicationAttemptId attemptId
+        = ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobid = recordFactory.newRecordInstance(JobId.class);
+    jobid.setAppId(appId);
+    ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+    Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
+    MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+            JobStateInternal.FAILED, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+    appMaster.init(conf);
+    appMaster.start();
+    appMaster.shutDownJob();
+    //test whether notifyIsLastAMRetry called
+    Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
+    verify(fs, times(0)).delete(stagingJobPath, true);
+  }
+
+  @Test
+  public void testPreservePatternMatchedStaging() throws IOException {
+    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+    // The staging files that are matched to the pattern
+    // should not be deleted
+    conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
+    fs = mock(FileSystem.class);
+    when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+    //Staging Dir exists
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+    when(fs.exists(stagingDir)).thenReturn(true);
+    ApplicationId appId
+        = ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    ApplicationAttemptId attemptId
+        = ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobid = recordFactory.newRecordInstance(JobId.class);
+    jobid.setAppId(appId);
+    ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+    Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
+    MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+            JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+    appMaster.init(conf);
+    appMaster.start();
+    appMaster.shutDownJob();
+    //test whether notifyIsLastAMRetry called
+    Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
+    verify(fs, times(0)).delete(stagingJobPath, true);
+  }
+
+  @Test
+  public void testNotPreserveNotPatternMatchedStaging() throws IOException {
+    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+    conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "NotMatching");
+    fs = mock(FileSystem.class);
+    when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+    //Staging Dir exists
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+    when(fs.exists(stagingDir)).thenReturn(true);
+    ApplicationId appId
+        = ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    ApplicationAttemptId attemptId
+        = ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobid = recordFactory.newRecordInstance(JobId.class);
+    jobid.setAppId(appId);
+    ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+    Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
+    MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+            JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+    appMaster.init(conf);
+    appMaster.start();
+    appMaster.shutDownJob();
+    //test whether notifyIsLastAMRetry called
+    Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
+    //Staging dir should be deleted because it is not matched with
+    //PRESERVE_FILES_PATTERN
+    verify(fs, times(1)).delete(stagingJobPath, true);
+  }
+
+  @Test
+  public void testPreservePatternMatchedAndFailedStaging() throws IOException {
+    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+    // When RESERVE_FILES_PATTERN and PRESERVE_FAILED_TASK_FILES are set,
+    // files in staging dir are always kept.
+    conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
+    conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
+    fs = mock(FileSystem.class);
+    when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+    //Staging Dir exists
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+    when(fs.exists(stagingDir)).thenReturn(true);
+    ApplicationId appId
+        = ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    ApplicationAttemptId attemptId
+        = ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobid = recordFactory.newRecordInstance(JobId.class);
+    jobid.setAppId(appId);
+    ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+    Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
+    MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+            JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+    appMaster.init(conf);
+    appMaster.start();
+    appMaster.shutDownJob();
+    //test whether notifyIsLastAMRetry called
+    Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
+    verify(fs, times(0)).delete(stagingJobPath, true);
+  }
+
    private class TestMRApp extends MRAppMaster {
      ContainerAllocator allocator;
      boolean testIsLastAMRetry = false;