|
@@ -418,6 +418,21 @@ public class TestTaskImpl {
|
|
|
killRunningTaskAttempt(getLastAttempt().getAttemptId());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testKillSuccessfulTask() {
|
|
|
+ LOG.info("--- START: testKillSuccesfulTask ---");
|
|
|
+ mockTask = createMockTask(TaskType.MAP);
|
|
|
+ TaskId taskId = getNewTaskID();
|
|
|
+ scheduleTaskAttempt(taskId);
|
|
|
+ launchTaskAttempt(getLastAttempt().getAttemptId());
|
|
|
+ commitTaskAttempt(getLastAttempt().getAttemptId());
|
|
|
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
|
|
|
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
|
|
|
+ assertTaskSucceededState();
|
|
|
+ mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
|
|
|
+ assertTaskSucceededState();
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testTaskProgress() {
|
|
|
LOG.info("--- START: testTaskProgress ---");
|
|
@@ -485,7 +500,8 @@ public class TestTaskImpl {
|
|
|
assertTaskSucceededState();
|
|
|
}
|
|
|
|
|
|
- private void runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType failEvent) {
|
|
|
+ private void runSpeculativeTaskAttemptSucceeds(
|
|
|
+ TaskEventType firstAttemptFinishEvent) {
|
|
|
TaskId taskId = getNewTaskID();
|
|
|
scheduleTaskAttempt(taskId);
|
|
|
launchTaskAttempt(getLastAttempt().getAttemptId());
|
|
@@ -502,9 +518,9 @@ public class TestTaskImpl {
|
|
|
// The task should now have succeeded
|
|
|
assertTaskSucceededState();
|
|
|
|
|
|
- // Now fail the first task attempt, after the second has succeeded
|
|
|
+ // Now complete the first task attempt, after the second has succeeded
|
|
|
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
|
|
|
- failEvent));
|
|
|
+ firstAttemptFinishEvent));
|
|
|
|
|
|
// The task should still be in the succeeded state
|
|
|
assertTaskSucceededState();
|
|
@@ -513,25 +529,36 @@ public class TestTaskImpl {
|
|
|
@Test
|
|
|
public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
|
|
|
mockTask = createMockTask(TaskType.MAP);
|
|
|
- runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
|
|
|
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
|
|
|
mockTask = createMockTask(TaskType.REDUCE);
|
|
|
- runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
|
|
|
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
|
|
|
mockTask = createMockTask(TaskType.MAP);
|
|
|
- runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
|
|
|
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
|
|
|
mockTask = createMockTask(TaskType.REDUCE);
|
|
|
- runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
|
|
|
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testMultipleTaskAttemptsSucceed() {
|
|
|
+ mockTask = createMockTask(TaskType.MAP);
|
|
|
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCommitAfterSucceeds() {
|
|
|
+ mockTask = createMockTask(TaskType.REDUCE);
|
|
|
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
|
|
|
+ }
|
|
|
}
|