|
@@ -425,6 +425,266 @@ public class TestRecovery {
|
|
|
app.verifyCompleted();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * The class provides a custom implementation of output committer setupTask
|
|
|
+ * and isRecoverySupported methods, which determines if recovery supported
|
|
|
+ * based on config property.
|
|
|
+ */
|
|
|
+ public static class TestFileOutputCommitter extends
|
|
|
+ org.apache.hadoop.mapred.FileOutputCommitter {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isRecoverySupported(
|
|
|
+ org.apache.hadoop.mapred.JobContext jobContext) {
|
|
|
+ boolean isRecoverySupported = false;
|
|
|
+ if (jobContext != null && jobContext.getConfiguration() != null) {
|
|
|
+ isRecoverySupported = jobContext.getConfiguration().getBoolean(
|
|
|
+ "want.am.recovery", false);
|
|
|
+ }
|
|
|
+ return isRecoverySupported;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test case primarily verifies if the recovery is controlled through config
|
|
|
+ * property. In this case, recover is turned ON. AM with 3 maps and 0 reduce.
|
|
|
+ * AM crashes after the first two tasks finishes and recovers completely and
|
|
|
+ * succeeds in the second generation.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception {
|
|
|
+ int runCount = 0;
|
|
|
+ MRApp app = new MRAppWithHistory(3, 0, false, this.getClass().getName(),
|
|
|
+ true, ++runCount);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass("mapred.output.committer.class",
|
|
|
+ TestFileOutputCommitter.class,
|
|
|
+ org.apache.hadoop.mapred.OutputCommitter.class);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ conf.setBoolean("want.am.recovery", true);
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+
|
|
|
+ // all maps would be running
|
|
|
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
|
|
+ Iterator<Task> it = job.getTasks().values().iterator();
|
|
|
+ Task mapTask1 = it.next();
|
|
|
+ Task mapTask2 = it.next();
|
|
|
+ Task mapTask3 = it.next();
|
|
|
+
|
|
|
+ // all maps must be running
|
|
|
+ app.waitForState(mapTask1, TaskState.RUNNING);
|
|
|
+ app.waitForState(mapTask2, TaskState.RUNNING);
|
|
|
+ app.waitForState(mapTask3, TaskState.RUNNING);
|
|
|
+
|
|
|
+ TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator()
|
|
|
+ .next();
|
|
|
+ TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator()
|
|
|
+ .next();
|
|
|
+ TaskAttempt task3Attempt = mapTask3.getAttempts().values().iterator()
|
|
|
+ .next();
|
|
|
+
|
|
|
+ // before sending the TA_DONE, event make sure attempt has come to
|
|
|
+ // RUNNING state
|
|
|
+ app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
|
|
|
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
|
|
+ app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ // send the done signal to the 1st two maps
|
|
|
+ app.getContext()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new TaskAttemptEvent(task1Attempt.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+ app.getContext()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new TaskAttemptEvent(task2Attempt.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ // wait for first two map task to complete
|
|
|
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ // stop the app
|
|
|
+ app.stop();
|
|
|
+
|
|
|
+ // rerun
|
|
|
+ // in rerun the 1st two map will be recovered from previous run
|
|
|
+ app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
|
|
+ ++runCount);
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setClass("mapred.output.committer.class",
|
|
|
+ TestFileOutputCommitter.class,
|
|
|
+ org.apache.hadoop.mapred.OutputCommitter.class);
|
|
|
+ conf.setBoolean("want.am.recovery", true);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ // Set num-reduces explicitly in conf as recovery logic depends on it.
|
|
|
+ conf.setInt(MRJobConfig.NUM_REDUCES, 0);
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+
|
|
|
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
|
|
+ it = job.getTasks().values().iterator();
|
|
|
+ mapTask1 = it.next();
|
|
|
+ mapTask2 = it.next();
|
|
|
+ mapTask3 = it.next();
|
|
|
+
|
|
|
+ // first two maps will be recovered, no need to send done
|
|
|
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ app.waitForState(mapTask3, TaskState.RUNNING);
|
|
|
+
|
|
|
+ task3Attempt = mapTask3.getAttempts().values().iterator().next();
|
|
|
+ // before sending the TA_DONE, event make sure attempt has come to
|
|
|
+ // RUNNING state
|
|
|
+ app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ // send the done signal to the 3rd map task
|
|
|
+ app.getContext()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new TaskAttemptEvent(mapTask3.getAttempts().values().iterator()
|
|
|
+ .next().getID(), TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ // wait to get it completed
|
|
|
+ app.waitForState(mapTask3, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ app.waitForState(job, JobState.SUCCEEDED);
|
|
|
+ app.verifyCompleted();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test case primarily verifies if the recovery is controlled through config
|
|
|
+ * property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.
|
|
|
+ * AM crashes after the first two tasks finishes and recovery fails and have
|
|
|
+ * to rerun fully in the second generation and succeeds.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRecoveryFailsUsingCustomOutputCommitter() throws Exception {
|
|
|
+ int runCount = 0;
|
|
|
+ MRApp app =
|
|
|
+ new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
|
|
|
+ ++runCount);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class,
|
|
|
+ org.apache.hadoop.mapred.OutputCommitter.class);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ conf.setBoolean("want.am.recovery", false);
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+
|
|
|
+ // all maps would be running
|
|
|
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
|
|
+ Iterator<Task> it = job.getTasks().values().iterator();
|
|
|
+ Task mapTask1 = it.next();
|
|
|
+ Task mapTask2 = it.next();
|
|
|
+ Task mapTask3 = it.next();
|
|
|
+
|
|
|
+ // all maps must be running
|
|
|
+ app.waitForState(mapTask1, TaskState.RUNNING);
|
|
|
+ app.waitForState(mapTask2, TaskState.RUNNING);
|
|
|
+ app.waitForState(mapTask3, TaskState.RUNNING);
|
|
|
+
|
|
|
+ TaskAttempt task1Attempt =
|
|
|
+ mapTask1.getAttempts().values().iterator().next();
|
|
|
+ TaskAttempt task2Attempt =
|
|
|
+ mapTask2.getAttempts().values().iterator().next();
|
|
|
+ TaskAttempt task3Attempt =
|
|
|
+ mapTask3.getAttempts().values().iterator().next();
|
|
|
+
|
|
|
+ // before sending the TA_DONE, event make sure attempt has come to
|
|
|
+ // RUNNING state
|
|
|
+ app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
|
|
|
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
|
|
+ app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ // send the done signal to the 1st two maps
|
|
|
+ app
|
|
|
+ .getContext()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
|
|
|
+ app
|
|
|
+ .getContext()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ // wait for first two map task to complete
|
|
|
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ // stop the app
|
|
|
+ app.stop();
|
|
|
+
|
|
|
+ // rerun
|
|
|
+ // in rerun the 1st two map will be recovered from previous run
|
|
|
+ app =
|
|
|
+ new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
|
|
+ ++runCount);
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class,
|
|
|
+ org.apache.hadoop.mapred.OutputCommitter.class);
|
|
|
+ conf.setBoolean("want.am.recovery", false);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ // Set num-reduces explicitly in conf as recovery logic depends on it.
|
|
|
+ conf.setInt(MRJobConfig.NUM_REDUCES, 0);
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+
|
|
|
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
|
|
+ it = job.getTasks().values().iterator();
|
|
|
+ mapTask1 = it.next();
|
|
|
+ mapTask2 = it.next();
|
|
|
+ mapTask3 = it.next();
|
|
|
+
|
|
|
+ // first two maps will NOT be recovered, need to send done from them
|
|
|
+ app.waitForState(mapTask1, TaskState.RUNNING);
|
|
|
+ app.waitForState(mapTask2, TaskState.RUNNING);
|
|
|
+
|
|
|
+ app.waitForState(mapTask3, TaskState.RUNNING);
|
|
|
+
|
|
|
+ task3Attempt = mapTask3.getAttempts().values().iterator().next();
|
|
|
+ // before sending the TA_DONE, event make sure attempt has come to
|
|
|
+ // RUNNING state
|
|
|
+ app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ // send the done signal to all 3 tasks map task
|
|
|
+ app
|
|
|
+ .getContext()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new TaskAttemptEvent(mapTask1.getAttempts().values().iterator().next()
|
|
|
+ .getID(), TaskAttemptEventType.TA_DONE));
|
|
|
+ app
|
|
|
+ .getContext()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new TaskAttemptEvent(mapTask2.getAttempts().values().iterator().next()
|
|
|
+ .getID(), TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ app
|
|
|
+ .getContext()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
|
|
|
+ .getID(), TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ // wait to get it completed
|
|
|
+ app.waitForState(mapTask3, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ app.waitForState(job, JobState.SUCCEEDED);
|
|
|
+ app.verifyCompleted();
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testMultipleCrashes() throws Exception {
|
|
|
|