浏览代码

MAPREDUCE-5062. Fix MR AM to read max-retries from the RM. Contributed by *Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1460923 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 年之前
父节点
当前提交
46315a2d91

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -253,6 +253,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5083. MiniMRCluster should use a random component when creating an
     MAPREDUCE-5083. MiniMRCluster should use a random component when creating an
     actual cluster (Siddharth Seth via hitesh)
     actual cluster (Siddharth Seth via hitesh)
 
 
+    MAPREDUCE-5062. Fix MR AM to read max-retries from the RM. (Zhijie Shen via
+    vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 Release 2.0.4-alpha - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 27 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -161,6 +161,7 @@ public class MRAppMaster extends CompositeService {
   private final int nmPort;
   private final int nmPort;
   private final int nmHttpPort;
   private final int nmHttpPort;
   protected final MRAppMetrics metrics;
   protected final MRAppMetrics metrics;
+  private final int maxAppAttempts;
   private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private List<AMInfo> amInfos;
   private List<AMInfo> amInfos;
   private AppContext context;
   private AppContext context;
@@ -194,14 +195,14 @@ public class MRAppMaster extends CompositeService {
 
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      long appSubmitTime) {
+      long appSubmitTime, int maxAppAttempts) {
     this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
     this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
-        new SystemClock(), appSubmitTime);
+        new SystemClock(), appSubmitTime, maxAppAttempts);
   }
   }
 
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      Clock clock, long appSubmitTime) {
+      Clock clock, long appSubmitTime, int maxAppAttempts) {
     super(MRAppMaster.class.getName());
     super(MRAppMaster.class.getName());
     this.clock = clock;
     this.clock = clock;
     this.startTime = clock.getTime();
     this.startTime = clock.getTime();
@@ -212,6 +213,7 @@ public class MRAppMaster extends CompositeService {
     this.nmPort = nmPort;
     this.nmPort = nmPort;
     this.nmHttpPort = nmHttpPort;
     this.nmHttpPort = nmHttpPort;
     this.metrics = MRAppMetrics.create();
     this.metrics = MRAppMetrics.create();
+    this.maxAppAttempts = maxAppAttempts;
     LOG.info("Created MRAppMaster for application " + applicationAttemptId);
     LOG.info("Created MRAppMaster for application " + applicationAttemptId);
   }
   }
 
 
@@ -220,18 +222,13 @@ public class MRAppMaster extends CompositeService {
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
 
     downloadTokensAndSetupUGI(conf);
     downloadTokensAndSetupUGI(conf);
-    
-    //TODO this is a hack, we really need the RM to inform us when we
-    // are the last one.  This would allow us to configure retries on
-    // a per application basis.
-    int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, 
-        YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
-    isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries;
-    LOG.info("AM Retries: " + numAMRetries + 
-        " attempt num: " + appAttemptID.getAttemptId() +
+
+    isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+    LOG.info("The specific max attempts: " + maxAppAttempts +
+        " for application: " + appAttemptID.getApplicationId().getId() +
+        ". Attempt num: " + appAttemptID.getAttemptId() +
         " is last retry: " + isLastAMRetry);
         " is last retry: " + isLastAMRetry);
-    
-    
+
     context = new RunningAppContext(conf);
     context = new RunningAppContext(conf);
 
 
     // Job name is the same as the app name util we support DAG of jobs
     // Job name is the same as the app name util we support DAG of jobs
@@ -266,6 +263,9 @@ public class MRAppMaster extends CompositeService {
       boolean commitFailure = fs.exists(endCommitFailureFile);
       boolean commitFailure = fs.exists(endCommitFailureFile);
       if(!stagingExists) {
       if(!stagingExists) {
         isLastAMRetry = true;
         isLastAMRetry = true;
+        LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
+            " is last retry: " + isLastAMRetry +
+            " because the staging dir doesn't exist.");
         errorHappenedShutDown = true;
         errorHappenedShutDown = true;
         forcedState = JobStateInternal.ERROR;
         forcedState = JobStateInternal.ERROR;
         shutDownMessage = "Staging dir does not exist " + stagingDir;
         shutDownMessage = "Staging dir does not exist " + stagingDir;
@@ -275,6 +275,9 @@ public class MRAppMaster extends CompositeService {
         // what result we will use to notify, and how we will unregister
         // what result we will use to notify, and how we will unregister
         errorHappenedShutDown = true;
         errorHappenedShutDown = true;
         isLastAMRetry = true;
         isLastAMRetry = true;
+        LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
+            " is last retry: " + isLastAMRetry +
+            " because a commit was started.");
         copyHistory = true;
         copyHistory = true;
         if (commitSuccess) {
         if (commitSuccess) {
           shutDownMessage = "We crashed after successfully committing. Recovering.";
           shutDownMessage = "We crashed after successfully committing. Recovering.";
@@ -777,6 +780,10 @@ public class MRAppMaster extends CompositeService {
     return taskAttemptListener;
     return taskAttemptListener;
   }
   }
 
 
+  public Boolean isLastAMRetry() {
+    return isLastAMRetry;
+  }
+
   /**
   /**
    * By the time life-cycle of this router starts, job-init would have already
    * By the time life-cycle of this router starts, job-init would have already
    * happened.
    * happened.
@@ -1206,6 +1213,8 @@ public class MRAppMaster extends CompositeService {
           System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
           System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
       String appSubmitTimeStr =
       String appSubmitTimeStr =
           System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
           System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+      String maxAppAttempts =
+          System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
       
       
       validateInputParam(containerIdStr,
       validateInputParam(containerIdStr,
           ApplicationConstants.AM_CONTAINER_ID_ENV);
           ApplicationConstants.AM_CONTAINER_ID_ENV);
@@ -1215,6 +1224,8 @@ public class MRAppMaster extends CompositeService {
           ApplicationConstants.NM_HTTP_PORT_ENV);
           ApplicationConstants.NM_HTTP_PORT_ENV);
       validateInputParam(appSubmitTimeStr,
       validateInputParam(appSubmitTimeStr,
           ApplicationConstants.APP_SUBMIT_TIME_ENV);
           ApplicationConstants.APP_SUBMIT_TIME_ENV);
+      validateInputParam(maxAppAttempts,
+          ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
 
 
       ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
       ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
       ApplicationAttemptId applicationAttemptId =
       ApplicationAttemptId applicationAttemptId =
@@ -1224,7 +1235,8 @@ public class MRAppMaster extends CompositeService {
       MRAppMaster appMaster =
       MRAppMaster appMaster =
           new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
           new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
               Integer.parseInt(nodePortString),
-              Integer.parseInt(nodeHttpPortString), appSubmitTime);
+              Integer.parseInt(nodeHttpPortString), appSubmitTime,
+              Integer.parseInt(maxAppAttempts));
       ShutdownHookManager.get().addShutdownHook(
       ShutdownHookManager.get().addShutdownHook(
         new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
         new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
       YarnConfiguration conf = new YarnConfiguration(new JobConf());
       YarnConfiguration conf = new YarnConfiguration(new JobConf());

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -192,7 +192,7 @@ public class MRApp extends MRAppMaster {
       int maps, int reduces, boolean autoComplete, String testName,
       int maps, int reduces, boolean autoComplete, String testName,
       boolean cleanOnStart, int startCount, Clock clock) {
       boolean cleanOnStart, int startCount, Clock clock) {
     super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
     super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
-        .currentTimeMillis());
+        .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
     this.testWorkDir = new File("target", testName);
     this.testWorkDir = new File("target", testName);
     testAbsPath = new Path(testWorkDir.getAbsolutePath());
     testAbsPath = new Path(testWorkDir.getAbsolutePath());
     LOG.info("PathUsed: " + testAbsPath);
     LOG.info("PathUsed: " + testAbsPath);

+ 73 - 16
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java

@@ -30,11 +30,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -42,6 +46,7 @@ import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
@@ -80,7 +85,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMasterTest appMaster =
     MRAppMasterTest appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis());
+            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
     YarnConfiguration conf = new YarnConfiguration();
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -109,7 +114,8 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), false);
+            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
+            false, false);
     boolean caught = false;
     boolean caught = false;
     try {
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -144,7 +150,8 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), false);
+            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
+            false, false);
     boolean caught = false;
     boolean caught = false;
     try {
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -179,7 +186,8 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), false);
+            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
+            false, false);
     boolean caught = false;
     boolean caught = false;
     try {
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -214,7 +222,8 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), false);
+            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
+            false, false);
     boolean caught = false;
     boolean caught = false;
     try {
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -230,36 +239,73 @@ public class TestMRAppMaster {
     assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
     assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
     appMaster.stop();
     appMaster.stop();
   }
   }
+
+  @Test (timeout = 30000)
+  public void testMRAppMasterMaxAppAttempts() throws IOException,
+      InterruptedException {
+    int[] maxAppAttemtps = new int[] { 1, 2, 3 };
+    Boolean[] expectedBools = new Boolean[]{ true, true, false };
+
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+
+    String userName = "TestAppMasterUser";
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+
+    File stagingDir =
+        new File(MRApps.getStagingAreaDir(conf, userName).toString());
+    stagingDir.mkdirs();
+    for (int i = 0; i < maxAppAttemtps.length; ++i) {
+      MRAppMasterTest appMaster =
+          new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+              System.currentTimeMillis(), maxAppAttemtps[i], false, true);
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+      assertEquals("isLastAMRetry is correctly computed.", expectedBools[i],
+          appMaster.isLastAMRetry());
+    }
+  }
+
 }
 }
 
 
 class MRAppMasterTest extends MRAppMaster {
 class MRAppMasterTest extends MRAppMaster {
 
 
   Path stagingDirPath;
   Path stagingDirPath;
   private Configuration conf;
   private Configuration conf;
-  private boolean overrideInitAndStart;
+  private boolean overrideInit;
+  private boolean overrideStart;
   ContainerAllocator mockContainerAllocator;
   ContainerAllocator mockContainerAllocator;
+  CommitterEventHandler mockCommitterEventHandler;
+  RMHeartbeatHandler mockRMHeartbeatHandler;
 
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
       ContainerId containerId, String host, int port, int httpPort,
-      long submitTime) {
-    this(applicationAttemptId, containerId, host, port, httpPort, submitTime, 
-        true);
+      long submitTime, int maxAppAttempts) {
+    this(applicationAttemptId, containerId, host, port, httpPort,
+        submitTime, maxAppAttempts, true, true);
   }
   }
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
       ContainerId containerId, String host, int port, int httpPort,
-      long submitTime, boolean overrideInitAndStart) {
-    super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
-    this.overrideInitAndStart = overrideInitAndStart;
+      long submitTime, int maxAppAttempts, boolean overrideInit,
+      boolean overrideStart) {
+    super(applicationAttemptId, containerId, host, port, httpPort, submitTime,
+        maxAppAttempts);
+    this.overrideInit = overrideInit;
+    this.overrideStart = overrideStart;
     mockContainerAllocator = mock(ContainerAllocator.class);
     mockContainerAllocator = mock(ContainerAllocator.class);
+    mockCommitterEventHandler = mock(CommitterEventHandler.class);
+    mockRMHeartbeatHandler = mock(RMHeartbeatHandler.class);
   }
   }
 
 
   @Override
   @Override
   public void init(Configuration conf) {
   public void init(Configuration conf) {
-    if (overrideInitAndStart) {
-      this.conf = conf; 
-    } else {
+    if (!overrideInit) {
       super.init(conf);
       super.init(conf);
     }
     }
+    this.conf = conf;
   }
   }
   
   
   @Override 
   @Override 
@@ -277,9 +323,20 @@ class MRAppMasterTest extends MRAppMaster {
     return mockContainerAllocator;
     return mockContainerAllocator;
   }
   }
 
 
+  @Override
+  protected EventHandler<CommitterEvent> createCommitterEventHandler(
+      AppContext context, OutputCommitter committer) {
+    return mockCommitterEventHandler;
+  }
+
+  @Override
+  protected RMHeartbeatHandler getRMHeartbeatHandler() {
+    return mockRMHeartbeatHandler;
+  }
+
   @Override
   @Override
   public void start() {
   public void start() {
-    if (overrideInitAndStart) {
+    if (overrideStart) {
       try {
       try {
         String user = UserGroupInformation.getCurrentUser().getShortUserName();
         String user = UserGroupInformation.getCurrentUser().getShortUserName();
         stagingDirPath = MRApps.getStagingAreaDir(conf, user);
         stagingDirPath = MRApps.getStagingAreaDir(conf, user);

+ 10 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

@@ -49,7 +49,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -93,10 +92,9 @@ import org.junit.Test;
      verify(fs).delete(stagingJobPath, true);
      verify(fs).delete(stagingJobPath, true);
    }
    }
    
    
-   @Test
+   @Test (timeout = 30000)
    public void testDeletionofStagingOnKill() throws IOException {
    public void testDeletionofStagingOnKill() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
-     conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
      fs = mock(FileSystem.class);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
      //Staging Dir exists
      //Staging Dir exists
@@ -113,7 +111,7 @@ import org.junit.Test;
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
      jobid.setAppId(appId);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
-     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4);
      appMaster.init(conf);
      appMaster.init(conf);
      //simulate the process being killed
      //simulate the process being killed
      MRAppMaster.MRAppMasterShutdownHook hook = 
      MRAppMaster.MRAppMasterShutdownHook hook = 
@@ -122,10 +120,9 @@ import org.junit.Test;
      verify(fs, times(0)).delete(stagingJobPath, true);
      verify(fs, times(0)).delete(stagingJobPath, true);
    }
    }
    
    
-   @Test
+   @Test (timeout = 30000)
    public void testDeletionofStagingOnKillLastTry() throws IOException {
    public void testDeletionofStagingOnKillLastTry() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
-     conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
      fs = mock(FileSystem.class);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
      //Staging Dir exists
      //Staging Dir exists
@@ -142,7 +139,8 @@ import org.junit.Test;
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
      jobid.setAppId(appId);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
-     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
      appMaster.init(conf);
      appMaster.init(conf);
      //simulate the process being killed
      //simulate the process being killed
      MRAppMaster.MRAppMasterShutdownHook hook = 
      MRAppMaster.MRAppMasterShutdownHook hook = 
@@ -155,15 +153,16 @@ import org.junit.Test;
      ContainerAllocator allocator;
      ContainerAllocator allocator;
 
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
-         ContainerAllocator allocator) {
+         ContainerAllocator allocator, int maxAppAttempts) {
        super(applicationAttemptId, BuilderUtils.newContainerId(
        super(applicationAttemptId, BuilderUtils.newContainerId(
-           applicationAttemptId, 1), "testhost", 2222, 3333, System
-           .currentTimeMillis());
+           applicationAttemptId, 1), "testhost", 2222, 3333,
+           System.currentTimeMillis(), maxAppAttempts);
        this.allocator = allocator;
        this.allocator = allocator;
      }
      }
 
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId) {
      public TestMRApp(ApplicationAttemptId applicationAttemptId) {
-       this(applicationAttemptId, null);
+       this(applicationAttemptId, null,
+           MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
      }
      }
 
 
      @Override
      @Override

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -663,5 +663,13 @@ public interface MRJobConfig {
   
   
   public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
   public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
       "^mapreduce\\.workflow\\.adjacency\\..+";
       "^mapreduce\\.workflow\\.adjacency\\..+";
+
+  /**
+   * The maximum number of application attempts.
+   * It is a application-specific setting.
+   */
+  public static final String MR_AM_MAX_ATTEMPTS = "mapreduce.am.max-attempts";
+
+  public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 1;
   
   
 }
 }

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -806,6 +806,14 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>mapreduce.am.max-attempts</name>
+  <value>1</value>
+  <description>The maximum number of application attempts. It is a
+  application-specific setting. It should not be larger than the global number
+  set by resourcemanager. Otherwise, it will be override.</description>
+</property>
+
 <!-- Job Notification Configuration -->
 <!-- Job Notification Configuration -->
 <property>
 <property>
  <name>mapreduce.job.end-notification.url</name>
  <name>mapreduce.job.end-notification.url</name>

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -481,6 +481,9 @@ public class YARNRunner implements ClientProtocol {
     appContext.setCancelTokensWhenComplete(
     appContext.setCancelTokensWhenComplete(
         conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
         conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
     appContext.setAMContainerSpec(amContainer);         // AM Container
     appContext.setAMContainerSpec(amContainer);         // AM Container
+    appContext.setMaxAppAttempts(
+        conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
+            MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
 
 
     return appContext;
     return appContext;
   }
   }