|
@@ -18,6 +18,8 @@
|
|
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
@@ -261,6 +263,12 @@ public class TestTaskImpl {
|
|
|
assertTaskRunningState();
|
|
|
}
|
|
|
|
|
|
+ private void commitTaskAttempt(TaskAttemptId attemptId) {
|
|
|
+ mockTask.handle(new TaskTAttemptEvent(attemptId,
|
|
|
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING));
|
|
|
+ assertTaskRunningState();
|
|
|
+ }
|
|
|
+
|
|
|
private MockTaskAttemptImpl getLastAttempt() {
|
|
|
return taskAttempts.get(taskAttempts.size()-1);
|
|
|
}
|
|
@@ -279,32 +287,45 @@ public class TestTaskImpl {
|
|
|
assertTaskRunningState();
|
|
|
}
|
|
|
|
|
|
+ private void failRunningTaskAttempt(TaskAttemptId attemptId) {
|
|
|
+ mockTask.handle(new TaskTAttemptEvent(attemptId,
|
|
|
+ TaskEventType.T_ATTEMPT_FAILED));
|
|
|
+ assertTaskRunningState();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* {@link TaskState#NEW}
|
|
|
*/
|
|
|
private void assertTaskNewState() {
|
|
|
- assertEquals(mockTask.getState(), TaskState.NEW);
|
|
|
+ assertEquals(TaskState.NEW, mockTask.getState());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@link TaskState#SCHEDULED}
|
|
|
*/
|
|
|
private void assertTaskScheduledState() {
|
|
|
- assertEquals(mockTask.getState(), TaskState.SCHEDULED);
|
|
|
+ assertEquals(TaskState.SCHEDULED, mockTask.getState());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@link TaskState#RUNNING}
|
|
|
*/
|
|
|
private void assertTaskRunningState() {
|
|
|
- assertEquals(mockTask.getState(), TaskState.RUNNING);
|
|
|
+ assertEquals(TaskState.RUNNING, mockTask.getState());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@link TaskState#KILL_WAIT}
|
|
|
*/
|
|
|
private void assertTaskKillWaitState() {
|
|
|
- assertEquals(mockTask.getState(), TaskState.KILL_WAIT);
|
|
|
+ assertEquals(TaskState.KILL_WAIT, mockTask.getState());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link TaskState#SUCCEEDED}
|
|
|
+ */
|
|
|
+ private void assertTaskSucceededState() {
|
|
|
+ assertEquals(TaskState.SUCCEEDED, mockTask.getState());
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -409,5 +430,32 @@ public class TestTaskImpl {
|
|
|
assert(mockTask.getProgress() == progress);
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testFailureDuringTaskAttemptCommit() {
|
|
|
+ TaskId taskId = getNewTaskID();
|
|
|
+ scheduleTaskAttempt(taskId);
|
|
|
+ launchTaskAttempt(getLastAttempt().getAttemptId());
|
|
|
+ updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
|
|
|
+ commitTaskAttempt(getLastAttempt().getAttemptId());
|
|
|
+
|
|
|
+ // During the task attempt commit there is an exception which causes
|
|
|
+ // the attempt to fail
|
|
|
+ updateLastAttemptState(TaskAttemptState.FAILED);
|
|
|
+ failRunningTaskAttempt(getLastAttempt().getAttemptId());
|
|
|
+
|
|
|
+ assertEquals(2, taskAttempts.size());
|
|
|
+ updateLastAttemptState(TaskAttemptState.SUCCEEDED);
|
|
|
+ commitTaskAttempt(getLastAttempt().getAttemptId());
|
|
|
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
|
|
|
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
|
|
|
+
|
|
|
+ assertFalse("First attempt should not commit",
|
|
|
+ mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
|
|
|
+ assertTrue("Second attempt should commit",
|
|
|
+ mockTask.canCommit(getLastAttempt().getAttemptId()));
|
|
|
+
|
|
|
+ assertTaskSucceededState();
|
|
|
+ }
|
|
|
|
|
|
}
|