소스 검색

Merge -c 1516358 from trunk to branch-2.1-beta to fix MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. Contributed by Vinod K. V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1516360 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 11 년 전
부모
커밋
215a16e08e

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -58,6 +58,9 @@ Release 2.1.1-beta - UNRELEASED
     pick up the right history file for the last successful AM. (Jian He via
     pick up the right history file for the last successful AM. (Jian He via
     vinodkv)
     vinodkv)
 
 
+    MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via
+    acmurthy)
+
 Release 2.1.0-beta - 2013-08-22
 Release 2.1.0-beta - 2013-08-22
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 5 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -1042,11 +1042,11 @@ public class MRAppMaster extends CompositeService {
     // attempt will generate one.  However that disables recovery if there
     // attempt will generate one.  However that disables recovery if there
     // are reducers as the shuffle secret would be app attempt specific.
     // are reducers as the shuffle secret would be app attempt specific.
     int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
     int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
-    boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
-        TokenCache.getShuffleSecretKey(jobCredentials) != null);
+    boolean shuffleKeyValidForRecovery =
+        TokenCache.getShuffleSecretKey(jobCredentials) != null;
 
 
     if (recoveryEnabled && recoverySupportedByCommitter
     if (recoveryEnabled && recoverySupportedByCommitter
-          && shuffleKeyValidForRecovery) {
+        && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
       LOG.info("Recovery is enabled. "
       LOG.info("Recovery is enabled. "
           + "Will try to recover from previous life on best effort basis.");
           + "Will try to recover from previous life on best effort basis.");
       try {
       try {
@@ -1059,7 +1059,8 @@ public class MRAppMaster extends CompositeService {
     } else {
     } else {
       LOG.info("Will not try to recover. recoveryEnabled: "
       LOG.info("Will not try to recover. recoveryEnabled: "
             + recoveryEnabled + " recoverySupportedByCommitter: "
             + recoveryEnabled + " recoverySupportedByCommitter: "
-            + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+            + recoverySupportedByCommitter + " numReduceTasks: "
+            + numReduceTasks + " shuffleKeyValidForRecovery: "
             + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
             + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
             + appAttemptID.getAttemptId());
             + appAttemptID.getAttemptId());
       // Get the amInfos anyways whether recovery is enabled or not
       // Get the amInfos anyways whether recovery is enabled or not

+ 110 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

@@ -114,7 +114,6 @@ public class TestRecovery {
   private Text val1 = new Text("val1");
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
   private Text val2 = new Text("val2");
 
 
-
   /**
   /**
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * completely disappears because of failed launch, one attempt gets killed and
    * completely disappears because of failed launch, one attempt gets killed and
@@ -316,6 +315,116 @@ public class TestRecovery {
     // available in the failed attempt should be available here
     // available in the failed attempt should be available here
   }
   }
 
 
+  /**
+   * AM with 3 maps and 0 reduce. AM crashes after the first two tasks finishes
+   * and recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testCrashOfMapsOnlyJob() throws Exception {
+    int runCount = 0;
+    MRApp app =
+        new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
+          ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    // all maps would be running
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task mapTask3 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt =
+        mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt =
+        mapTask2.getAttempts().values().iterator().next();
+    TaskAttempt task3Attempt =
+        mapTask3.getAttempts().values().iterator().next();
+
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 1st two maps
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait for first two map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // stop the app
+    app.stop();
+
+    // rerun
+    // in rerun the 1st two map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    // Set num-reduces explicitly in conf as recovery logic depends on it.
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    mapTask3 = it.next();
+
+    // first two maps will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    task3Attempt = mapTask3.getAttempts().values().iterator().next();
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 3rd map task
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
+          .getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait to get it completed
+    app.waitForState(mapTask3, TaskState.SUCCEEDED);
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
   @Test
   @Test
   public void testMultipleCrashes() throws Exception {
   public void testMultipleCrashes() throws Exception {