|
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
|
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
@@ -74,7 +76,14 @@ public class TestRecovery {
|
|
|
private Text val1 = new Text("val1");
|
|
|
private Text val2 = new Text("val2");
|
|
|
|
|
|
-
|
|
|
+ /**
|
|
|
+ * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
|
|
|
+ * completely disappears because of failed launch, one attempt gets killed and
|
|
|
+ * one attempt succeeds. AM crashes after the first tasks finishes and
|
|
|
+ * recovers completely and succeeds in the second generation.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
@Test
|
|
|
public void testCrashed() throws Exception {
|
|
|
|
|
@@ -112,7 +121,8 @@ public class TestRecovery {
|
|
|
// reduces must be in NEW state
|
|
|
Assert.assertEquals("Reduce Task state not correct",
|
|
|
TaskState.RUNNING, reduceTask.getReport().getTaskState());
|
|
|
-
|
|
|
+
|
|
|
+ /////////// Play some games with the TaskAttempts of the first task //////
|
|
|
//send the fail signal to the 1st map task attempt
|
|
|
app.getContext().getEventHandler().handle(
|
|
|
new TaskAttemptEvent(
|
|
@@ -120,42 +130,68 @@ public class TestRecovery {
|
|
|
TaskAttemptEventType.TA_FAILMSG));
|
|
|
|
|
|
app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
|
|
|
-
|
|
|
- while (mapTask1.getAttempts().size() != 2) {
|
|
|
+
|
|
|
+ int timeOut = 0;
|
|
|
+ while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
|
|
|
Thread.sleep(2000);
|
|
|
LOG.info("Waiting for next attempt to start");
|
|
|
}
|
|
|
+ Assert.assertEquals(2, mapTask1.getAttempts().size());
|
|
|
Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
|
|
|
itr.next();
|
|
|
TaskAttempt task1Attempt2 = itr.next();
|
|
|
|
|
|
- app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
|
|
|
+ // This attempt will automatically fail because of the way ContainerLauncher
|
|
|
+ // is setup
|
|
|
+ // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(task1Attempt2.getID(),
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
|
|
|
+ app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
|
|
|
+
|
|
|
+ timeOut = 0;
|
|
|
+ while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
|
|
|
+ Thread.sleep(2000);
|
|
|
+ LOG.info("Waiting for next attempt to start");
|
|
|
+ }
|
|
|
+ Assert.assertEquals(3, mapTask1.getAttempts().size());
|
|
|
+ itr = mapTask1.getAttempts().values().iterator();
|
|
|
+ itr.next();
|
|
|
+ itr.next();
|
|
|
+ TaskAttempt task1Attempt3 = itr.next();
|
|
|
+
|
|
|
+ app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
|
|
|
|
|
|
- //send the kill signal to the 1st map 2nd attempt
|
|
|
+ //send the kill signal to the 1st map 3rd attempt
|
|
|
app.getContext().getEventHandler().handle(
|
|
|
new TaskAttemptEvent(
|
|
|
- task1Attempt2.getID(),
|
|
|
+ task1Attempt3.getID(),
|
|
|
TaskAttemptEventType.TA_KILL));
|
|
|
|
|
|
- app.waitForState(task1Attempt2, TaskAttemptState.KILLED);
|
|
|
-
|
|
|
- while (mapTask1.getAttempts().size() != 3) {
|
|
|
+ app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
|
|
|
+
|
|
|
+ timeOut = 0;
|
|
|
+ while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
|
|
|
Thread.sleep(2000);
|
|
|
LOG.info("Waiting for next attempt to start");
|
|
|
}
|
|
|
+ Assert.assertEquals(4, mapTask1.getAttempts().size());
|
|
|
itr = mapTask1.getAttempts().values().iterator();
|
|
|
itr.next();
|
|
|
itr.next();
|
|
|
- TaskAttempt task1Attempt3 = itr.next();
|
|
|
+ itr.next();
|
|
|
+ TaskAttempt task1Attempt4 = itr.next();
|
|
|
|
|
|
- app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
|
|
|
+ app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
|
|
|
|
|
|
- //send the done signal to the 1st map 3rd attempt
|
|
|
+ //send the done signal to the 1st map 4th attempt
|
|
|
app.getContext().getEventHandler().handle(
|
|
|
new TaskAttemptEvent(
|
|
|
- task1Attempt3.getID(),
|
|
|
+ task1Attempt4.getID(),
|
|
|
TaskAttemptEventType.TA_DONE));
|
|
|
|
|
|
+ /////////// End of games with the TaskAttempts of the first task //////
|
|
|
+
|
|
|
//wait for first map task to complete
|
|
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
long task1StartTime = mapTask1.getReport().getStartTime();
|
|
@@ -241,6 +277,136 @@ public class TestRecovery {
|
|
|
// available in the failed attempt should be available here
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testMultipleCrashes() throws Exception {
|
|
|
+
|
|
|
+ int runCount = 0;
|
|
|
+ MRApp app =
|
|
|
+ new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
|
|
|
+ ++runCount);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean("mapred.mapper.new-api", true);
|
|
|
+ conf.setBoolean("mapred.reducer.new-api", true);
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ //all maps would be running
|
|
|
+ Assert.assertEquals("No of tasks not correct",
|
|
|
+ 3, job.getTasks().size());
|
|
|
+ Iterator<Task> it = job.getTasks().values().iterator();
|
|
|
+ Task mapTask1 = it.next();
|
|
|
+ Task mapTask2 = it.next();
|
|
|
+ Task reduceTask = it.next();
|
|
|
+
|
|
|
+ // all maps must be running
|
|
|
+ app.waitForState(mapTask1, TaskState.RUNNING);
|
|
|
+ app.waitForState(mapTask2, TaskState.RUNNING);
|
|
|
+
|
|
|
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
|
|
|
+ TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
|
|
|
+
|
|
|
+ //before sending the TA_DONE, event make sure attempt has come to
|
|
|
+ //RUNNING state
|
|
|
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
|
|
|
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ // reduces must be in NEW state
|
|
|
+ Assert.assertEquals("Reduce Task state not correct",
|
|
|
+ TaskState.RUNNING, reduceTask.getReport().getTaskState());
|
|
|
+
|
|
|
+ //send the done signal to the 1st map
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(
|
|
|
+ task1Attempt1.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ //wait for first map task to complete
|
|
|
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ // Crash the app
|
|
|
+ app.stop();
|
|
|
+
|
|
|
+ //rerun
|
|
|
+ //in rerun the 1st map will be recovered from previous run
|
|
|
+ app =
|
|
|
+ new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
|
|
+ ++runCount);
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
|
|
+ conf.setBoolean("mapred.mapper.new-api", true);
|
|
|
+ conf.setBoolean("mapred.reducer.new-api", true);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ //all maps would be running
|
|
|
+ Assert.assertEquals("No of tasks not correct",
|
|
|
+ 3, job.getTasks().size());
|
|
|
+ it = job.getTasks().values().iterator();
|
|
|
+ mapTask1 = it.next();
|
|
|
+ mapTask2 = it.next();
|
|
|
+ reduceTask = it.next();
|
|
|
+
|
|
|
+ // first map will be recovered, no need to send done
|
|
|
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ app.waitForState(mapTask2, TaskState.RUNNING);
|
|
|
+
|
|
|
+ task2Attempt = mapTask2.getAttempts().values().iterator().next();
|
|
|
+ //before sending the TA_DONE, event make sure attempt has come to
|
|
|
+ //RUNNING state
|
|
|
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ //send the done signal to the 2nd map task
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(
|
|
|
+ mapTask2.getAttempts().values().iterator().next().getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ //wait to get it completed
|
|
|
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ // Crash the app again.
|
|
|
+ app.stop();
|
|
|
+
|
|
|
+ //rerun
|
|
|
+ //in rerun the 1st and 2nd map will be recovered from previous run
|
|
|
+ app =
|
|
|
+ new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
|
|
+ ++runCount);
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
|
|
+ conf.setBoolean("mapred.mapper.new-api", true);
|
|
|
+ conf.setBoolean("mapred.reducer.new-api", true);
|
|
|
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ //all maps would be running
|
|
|
+ Assert.assertEquals("No of tasks not correct",
|
|
|
+ 3, job.getTasks().size());
|
|
|
+ it = job.getTasks().values().iterator();
|
|
|
+ mapTask1 = it.next();
|
|
|
+ mapTask2 = it.next();
|
|
|
+ reduceTask = it.next();
|
|
|
+
|
|
|
+ // The maps will be recovered, no need to send done
|
|
|
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
|
|
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ //wait for reduce to be running before sending done
|
|
|
+ app.waitForState(reduceTask, TaskState.RUNNING);
|
|
|
+ //send the done signal to the reduce
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(
|
|
|
+ reduceTask.getAttempts().values().iterator().next().getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ app.waitForState(job, JobState.SUCCEEDED);
|
|
|
+ app.verifyCompleted();
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testOutputRecovery() throws Exception {
|
|
|
int runCount = 0;
|
|
@@ -552,7 +718,7 @@ public class TestRecovery {
|
|
|
}
|
|
|
|
|
|
|
|
|
- class MRAppWithHistory extends MRApp {
|
|
|
+ static class MRAppWithHistory extends MRApp {
|
|
|
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
|
|
|
String testName, boolean cleanOnStart, int startCount) {
|
|
|
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
|
|
@@ -567,7 +733,17 @@ public class TestRecovery {
|
|
|
|
|
|
@Override
|
|
|
protected ContainerLauncher createContainerLauncher(AppContext context) {
|
|
|
- MockContainerLauncher launcher = new MockContainerLauncher();
|
|
|
+ MockContainerLauncher launcher = new MockContainerLauncher() {
|
|
|
+ @Override
|
|
|
+ public void handle(ContainerLauncherEvent event) {
|
|
|
+ TaskAttemptId taskAttemptID = event.getTaskAttemptID();
|
|
|
+ // Pass everything except the 2nd attempt of the first task.
|
|
|
+ if (taskAttemptID.getId() != 1
|
|
|
+ || taskAttemptID.getTaskId().getId() != 0) {
|
|
|
+ super.handle(event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
launcher.shufflePort = 5467;
|
|
|
return launcher;
|
|
|
}
|
|
@@ -581,7 +757,7 @@ public class TestRecovery {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class RecoveryServiceWithCustomDispatcher extends RecoveryService {
|
|
|
+ static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
|
|
|
|
|
|
public RecoveryServiceWithCustomDispatcher(
|
|
|
ApplicationAttemptId applicationAttemptId, Clock clock,
|