Ver código fonte

svn merge -c 1516358 FIXES: MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. Contributed by Vinod K. V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1516469 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 11 anos atrás
pai
commit
b2d4111e38

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -35,6 +35,9 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
     failures (Sandy Ryza via jlowe)
 
+    MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via
+    acmurthy)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 5 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -1018,11 +1018,11 @@ public class MRAppMaster extends CompositeService {
     // attempt will generate one.  However that disables recovery if there
     // are reducers as the shuffle secret would be app attempt specific.
     int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
-    boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
-        TokenCache.getShuffleSecretKey(fsTokens) != null);
+    boolean shuffleKeyValidForRecovery =
+        TokenCache.getShuffleSecretKey(fsTokens) != null;
 
     if (recoveryEnabled && recoverySupportedByCommitter
-        && shuffleKeyValidForRecovery) {
+        && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
       LOG.info("Recovery is enabled. "
           + "Will try to recover from previous life on best effort basis.");
       try {
@@ -1035,7 +1035,8 @@ public class MRAppMaster extends CompositeService {
     } else {
       LOG.info("Will not try to recover. recoveryEnabled: "
             + recoveryEnabled + " recoverySupportedByCommitter: "
-            + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+            + recoverySupportedByCommitter + " numReduceTasks: "
+            + numReduceTasks + " shuffleKeyValidForRecovery: "
             + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
             + appAttemptID.getAttemptId());
       // Get the amInfos anyways whether recovery is enabled or not

+ 110 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

@@ -116,7 +116,6 @@ public class TestRecovery {
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
-
   /**
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * completely disappears because of failed launch, one attempt gets killed and
@@ -318,6 +317,116 @@ public class TestRecovery {
     // available in the failed attempt should be available here
   }
 
+  /**
+   * AM with 3 maps and 0 reduce. AM crashes after the first two tasks finishes
+   * and recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testCrashOfMapsOnlyJob() throws Exception {
+    int runCount = 0;
+    MRApp app =
+        new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
+          ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    // all maps would be running
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task mapTask3 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt =
+        mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt =
+        mapTask2.getAttempts().values().iterator().next();
+    TaskAttempt task3Attempt =
+        mapTask3.getAttempts().values().iterator().next();
+
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 1st two maps
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait for first two map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // stop the app
+    app.stop();
+
+    // rerun
+    // in rerun the 1st two map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    // Set num-reduces explicitly in conf as recovery logic depends on it.
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    mapTask3 = it.next();
+
+    // first two maps will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    task3Attempt = mapTask3.getAttempts().values().iterator().next();
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 3rd map task
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
+          .getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait to get it completed
+    app.waitForState(mapTask3, TaskState.SUCCEEDED);
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
   @Test
   public void testMultipleCrashes() throws Exception {