Browse Source

MAPREDUCE-3802. Added test to validate that AM can crash multiple times and still can recover successfully after MAPREDUCE-3846. (vinodkv)
svn merge --ignore-ancestry -c 1244178 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.1@1244182 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 13 years ago
parent
commit
4931fb9ac9

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -744,6 +744,9 @@ Release 0.23.1 - 2012-02-08
     MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then
     MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then
     the recovery. (vinodkv)
     the recovery. (vinodkv)
 
 
+    MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
+    still can recover successfully after MAPREDUCE-3846. (vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 Release 0.23.0 - 2011-11-01 
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 130 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

@@ -277,6 +277,136 @@ public class TestRecovery {
     // available in the failed attempt should be available here
     // available in the failed attempt should be available here
   }
   }
 
 
+  @Test
+  public void testMultipleCrashes() throws Exception {
+
+    int runCount = 0;
+    MRApp app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
+          ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+    //send the done signal to the 1st map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+          task1Attempt1.getID(),
+          TaskAttemptEventType.TA_DONE));
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    // Crash the app
+    app.stop();
+
+    //rerun
+    //in rerun the 1st map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // first map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // Crash the app again.
+    app.stop();
+
+    //rerun
+    //in rerun the 1st and 2nd map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // The maps will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
   @Test
   @Test
   public void testOutputRecovery() throws Exception {
   public void testOutputRecovery() throws Exception {
     int runCount = 0;
     int runCount = 0;