浏览代码

MAPREDUCE-6607. Enable regex pattern matching when mapreduce.task.files.preserve.filepattern is set. Contributed by Kai Sasaki.

Akira Ajisaka 9 年之前
父节点
当前提交
3b3b63081b

+ 26 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
 import java.io.IOException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Paths;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -34,6 +35,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -567,9 +570,27 @@ public class MRAppMaster extends CompositeService {
           NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf);
           NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf);
   }
   }
 
 
-  protected boolean keepJobFiles(JobConf conf) {
-    return (conf.getKeepTaskFilesPattern() != null || conf
-        .getKeepFailedTaskFiles());
+  private boolean isJobNamePatternMatch(JobConf conf, String jobTempDir) {
+    // Matched staging files should be preserved after job is finished.
+    if (conf.getKeepTaskFilesPattern() != null && jobTempDir != null) {
+      String jobFileName = Paths.get(jobTempDir).getFileName().toString();
+      Pattern pattern = Pattern.compile(conf.getKeepTaskFilesPattern());
+      Matcher matcher = pattern.matcher(jobFileName);
+      return matcher.find();
+    } else {
+      return false;
+    }
+  }
+
+  private boolean isKeepFailedTaskFiles(JobConf conf) {
+    // TODO: Decide which failed task files that should
+    // be kept are in application log directory.
+    return conf.getKeepFailedTaskFiles();
+  }
+
+  protected boolean keepJobFiles(JobConf conf, String jobTempDir) {
+    return isJobNamePatternMatch(conf, jobTempDir)
+            || isKeepFailedTaskFiles(conf);
   }
   }
   
   
   /**
   /**
@@ -592,11 +613,10 @@ public class MRAppMaster extends CompositeService {
    */
    */
   public void cleanupStagingDir() throws IOException {
   public void cleanupStagingDir() throws IOException {
     /* make sure we clean the staging files */
     /* make sure we clean the staging files */
-    String jobTempDir = null;
+    String jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
     FileSystem fs = getFileSystem(getConfig());
     FileSystem fs = getFileSystem(getConfig());
     try {
     try {
-      if (!keepJobFiles(new JobConf(getConfig()))) {
-        jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
+      if (!keepJobFiles(new JobConf(getConfig()), jobTempDir)) {
         if (jobTempDir == null) {
         if (jobTempDir == null) {
           LOG.warn("Job Staging directory is null");
           LOG.warn("Job Staging directory is null");
           return;
           return;

+ 120 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -76,6 +77,11 @@ import org.junit.Test;
    private final static RecordFactory recordFactory = RecordFactoryProvider.
    private final static RecordFactory recordFactory = RecordFactoryProvider.
        getRecordFactory(null);
        getRecordFactory(null);
 
 
+   @After
+   public void tearDown() {
+     conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, false);
+   }
+
    @Test
    @Test
    public void testDeletionofStagingOnUnregistrationFailure()
    public void testDeletionofStagingOnUnregistrationFailure()
        throws IOException {
        throws IOException {
@@ -245,6 +251,120 @@ import org.junit.Test;
      verify(fs).delete(stagingJobPath, true);
      verify(fs).delete(stagingJobPath, true);
    }
    }
 
 
+   @Test
+   public void testByPreserveFailedStaging() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     // TODO: Decide which failed task files that should
+     // be kept are in application log directory.
+     // Currently all files are not deleted from staging dir.
+     conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
+     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+             JobStateInternal.FAILED, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+     appMaster.init(conf);
+     appMaster.start();
+     appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
+     verify(fs, times(0)).delete(stagingJobPath, true);
+   }
+
+   @Test
+   public void testPreservePatternMatchedStaging() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     // The staging files that are matched to the pattern
+     // should not be deleted
+     conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
+     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+             JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+     appMaster.init(conf);
+     appMaster.start();
+     appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
+     verify(fs, times(0)).delete(stagingJobPath, true);
+   }
+
+  @Test
+  public void testNotPreserveNotPatternMatchedStaging() throws IOException {
+    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+    conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "NotMatching");
+    fs = mock(FileSystem.class);
+    when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+    //Staging Dir exists
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+    when(fs.exists(stagingDir)).thenReturn(true);
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobid = recordFactory.newRecordInstance(JobId.class);
+    jobid.setAppId(appId);
+    ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+    Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
+    MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+            JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+    appMaster.init(conf);
+    appMaster.start();
+    appMaster.shutDownJob();
+    //test whether notifyIsLastAMRetry called
+    Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
+    //Staging dir should be deleted because it is not matched with
+    //PRESERVE_FILES_PATTERN
+    verify(fs, times(1)).delete(stagingJobPath, true);
+  }
+
+  @Test
+  public void testPreservePatternMatchedAndFailedStaging() throws IOException {
+    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+    // When RESERVE_FILES_PATTERN and PRESERVE_FAILED_TASK_FILES are set,
+    // files in staging dir are always kept.
+    conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
+    conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
+    fs = mock(FileSystem.class);
+    when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+    //Staging Dir exists
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+    when(fs.exists(stagingDir)).thenReturn(true);
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobid = recordFactory.newRecordInstance(JobId.class);
+    jobid.setAppId(appId);
+    ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+    Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
+    MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+            JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+    appMaster.init(conf);
+    appMaster.start();
+    appMaster.shutDownJob();
+    //test whether notifyIsLastAMRetry called
+    Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry());
+    verify(fs, times(0)).delete(stagingJobPath, true);
+  }
+
    private class TestMRApp extends MRAppMaster {
    private class TestMRApp extends MRAppMaster {
      ContainerAllocator allocator;
      ContainerAllocator allocator;
      boolean testIsLastAMRetry = false;
      boolean testIsLastAMRetry = false;