|
@@ -626,6 +626,115 @@ public class TestRecovery {
|
|
|
validateOutput();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testRecoveryWithOldCommiter() throws Exception {
|
|
|
+ int runCount = 0;
|
|
|
+ MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
|
|
|
+ true, ++runCount);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean("mapred.mapper.new-api", false);
|
|
|
+ conf.setBoolean("mapred.reducer.new-api", false);
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ Assert.assertEquals("No of tasks not correct",
|
|
|
+ 3, job.getTasks().size());
|
|
|
+ Iterator<Task> it = job.getTasks().values().iterator();
|
|
|
+ Task mapTask1 = it.next();
|
|
|
+ Task reduceTask1 = it.next();
|
|
|
+
|
|
|
+ // all maps must be running
|
|
|
+ app.waitForState(mapTask1, TaskState.RUNNING);
|
|
|
+
|
|
|
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
|
|
|
+ .next();
|
|
|
+
|
|
|
+ //before sending the TA_DONE, event make sure attempt has come to
|
|
|
+ //RUNNING state
|
|
|
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ //send the done signal to the map
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(
|
|
|
+ task1Attempt1.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ //wait for map task to complete
|
|
|
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ // Verify the shuffle-port
|
|
|
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
|
|
|
+
|
|
|
+ app.waitForState(reduceTask1, TaskState.RUNNING);
|
|
|
+ TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
|
|
|
+
|
|
|
+ // write output corresponding to reduce1
|
|
|
+ writeOutput(reduce1Attempt1, conf);
|
|
|
+
|
|
|
+ //send the done signal to the 1st reduce
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(
|
|
|
+ reduce1Attempt1.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ //wait for first reduce task to complete
|
|
|
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ //stop the app before the job completes.
|
|
|
+ app.stop();
|
|
|
+
|
|
|
+ //rerun
|
|
|
+ //in rerun the map will be recovered from previous run
|
|
|
+ app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
|
|
|
+ ++runCount);
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
|
|
+ conf.setBoolean("mapred.mapper.new-api", false);
|
|
|
+ conf.setBoolean("mapred.reducer.new-api", false);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ Assert.assertEquals("No of tasks not correct",
|
|
|
+ 3, job.getTasks().size());
|
|
|
+ it = job.getTasks().values().iterator();
|
|
|
+ mapTask1 = it.next();
|
|
|
+ reduceTask1 = it.next();
|
|
|
+ Task reduceTask2 = it.next();
|
|
|
+
|
|
|
+ // map will be recovered, no need to send done
|
|
|
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ // Verify the shuffle-port after recovery
|
|
|
+ task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
|
|
|
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
|
|
|
+
|
|
|
+ // first reduce will be recovered, no need to send done
|
|
|
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ app.waitForState(reduceTask2, TaskState.RUNNING);
|
|
|
+
|
|
|
+ TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
|
|
|
+ .iterator().next();
|
|
|
+ //before sending the TA_DONE, event make sure attempt has come to
|
|
|
+ //RUNNING state
|
|
|
+ app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ //send the done signal to the 2nd reduce task
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(
|
|
|
+ reduce2Attempt.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ //wait to get it completed
|
|
|
+ app.waitForState(reduceTask2, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ app.waitForState(job, JobState.SUCCEEDED);
|
|
|
+ app.verifyCompleted();
|
|
|
+ validateOutput();
|
|
|
+ }
|
|
|
+
|
|
|
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
|
|
|
throws Exception {
|
|
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
|