|
@@ -112,6 +112,57 @@ public class TestTaskAttempt{
|
|
|
testMRAppHistory(app);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testMRAppHistoryForTAFailedInAssigned() throws Exception {
|
|
|
+ // test TA_CONTAINER_LAUNCH_FAILED for map
|
|
|
+ FailingAttemptsDuringAssignedMRApp app =
|
|
|
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
|
|
|
+ testTaskAttemptAssignedFailHistory(app);
|
|
|
+
|
|
|
+ // test TA_CONTAINER_LAUNCH_FAILED for reduce
|
|
|
+ app =
|
|
|
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
|
|
|
+ testTaskAttemptAssignedFailHistory(app);
|
|
|
+
|
|
|
+ // test TA_CONTAINER_COMPLETED for map
|
|
|
+ app =
|
|
|
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED);
|
|
|
+ testTaskAttemptAssignedFailHistory(app);
|
|
|
+
|
|
|
+ // test TA_CONTAINER_COMPLETED for reduce
|
|
|
+ app =
|
|
|
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED);
|
|
|
+ testTaskAttemptAssignedFailHistory(app);
|
|
|
+
|
|
|
+ // test TA_FAILMSG for map
|
|
|
+ app =
|
|
|
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
|
|
|
+ TaskAttemptEventType.TA_FAILMSG);
|
|
|
+ testTaskAttemptAssignedFailHistory(app);
|
|
|
+
|
|
|
+ // test TA_FAILMSG for reduce
|
|
|
+ app =
|
|
|
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
|
|
|
+ TaskAttemptEventType.TA_FAILMSG);
|
|
|
+ testTaskAttemptAssignedFailHistory(app);
|
|
|
+
|
|
|
+ // test TA_KILL for map
|
|
|
+ app =
|
|
|
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
|
|
|
+ TaskAttemptEventType.TA_KILL);
|
|
|
+ testTaskAttemptAssignedKilledHistory(app);
|
|
|
+
|
|
|
+ // test TA_KILL for reduce
|
|
|
+ app =
|
|
|
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
|
|
|
+ TaskAttemptEventType.TA_KILL);
|
|
|
+ testTaskAttemptAssignedKilledHistory(app);
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testSingleRackRequest() throws Exception {
|
|
|
TaskAttemptImpl.RequestContainerTransition rct =
|
|
@@ -299,6 +350,31 @@ public class TestTaskAttempt{
|
|
|
report.getTaskAttemptState());
|
|
|
}
|
|
|
|
|
|
+ private void testTaskAttemptAssignedFailHistory
|
|
|
+ (FailingAttemptsDuringAssignedMRApp app) throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.FAILED);
|
|
|
+ Map<TaskId, Task> tasks = job.getTasks();
|
|
|
+ Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
|
|
|
+ Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testTaskAttemptAssignedKilledHistory
|
|
|
+ (FailingAttemptsDuringAssignedMRApp app) throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ Map<TaskId, Task> tasks = job.getTasks();
|
|
|
+ Task task = tasks.values().iterator().next();
|
|
|
+ app.waitForState(task, TaskState.SCHEDULED);
|
|
|
+ Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
|
|
|
+ TaskAttempt attempt = attempts.values().iterator().next();
|
|
|
+ app.waitForState(attempt, TaskAttemptState.KILLED);
|
|
|
+ Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
|
|
|
+ Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent());
|
|
|
+ }
|
|
|
+
|
|
|
static class FailingAttemptsMRApp extends MRApp {
|
|
|
FailingAttemptsMRApp(int maps, int reduces) {
|
|
|
super(maps, reduces, true, "FailingAttemptsMRApp", true);
|
|
@@ -329,6 +405,72 @@ public class TestTaskAttempt{
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static class FailingAttemptsDuringAssignedMRApp extends MRApp {
|
|
|
+ FailingAttemptsDuringAssignedMRApp(int maps, int reduces,
|
|
|
+ TaskAttemptEventType event) {
|
|
|
+ super(maps, reduces, true, "FailingAttemptsMRApp", true);
|
|
|
+ sendFailEvent = event;
|
|
|
+ }
|
|
|
+
|
|
|
+ TaskAttemptEventType sendFailEvent;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void containerLaunched(TaskAttemptId attemptID,
|
|
|
+ int shufflePort) {
|
|
|
+ //do nothing, not send TA_CONTAINER_LAUNCHED event
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void attemptLaunched(TaskAttemptId attemptID) {
|
|
|
+ getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(attemptID, sendFailEvent));
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean receiveTaStartJHEvent = false;
|
|
|
+ private boolean receiveTaFailedJHEvent = false;
|
|
|
+ private boolean receiveTaKilledJHEvent = false;
|
|
|
+
|
|
|
+ public boolean getTaStartJHEvent(){
|
|
|
+ return receiveTaStartJHEvent;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean getTaFailedJHEvent(){
|
|
|
+ return receiveTaFailedJHEvent;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean getTaKilledJHEvent(){
|
|
|
+ return receiveTaKilledJHEvent;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
|
|
+ AppContext context) {
|
|
|
+ return new EventHandler<JobHistoryEvent>() {
|
|
|
+ @Override
|
|
|
+ public void handle(JobHistoryEvent event) {
|
|
|
+ if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.
|
|
|
+ EventType.MAP_ATTEMPT_FAILED) {
|
|
|
+ receiveTaFailedJHEvent = true;
|
|
|
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
|
|
|
+ jobhistory.EventType.MAP_ATTEMPT_KILLED) {
|
|
|
+ receiveTaKilledJHEvent = true;
|
|
|
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
|
|
|
+ jobhistory.EventType.MAP_ATTEMPT_STARTED) {
|
|
|
+ receiveTaStartJHEvent = true;
|
|
|
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
|
|
|
+ jobhistory.EventType.REDUCE_ATTEMPT_FAILED) {
|
|
|
+ receiveTaFailedJHEvent = true;
|
|
|
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
|
|
|
+ jobhistory.EventType.REDUCE_ATTEMPT_KILLED) {
|
|
|
+ receiveTaKilledJHEvent = true;
|
|
|
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
|
|
|
+ jobhistory.EventType.REDUCE_ATTEMPT_STARTED) {
|
|
|
+ receiveTaStartJHEvent = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testLaunchFailedWhileKilling() throws Exception {
|
|
|
ApplicationId appId = ApplicationId.newInstance(1, 2);
|