|
@@ -1773,6 +1773,38 @@ public class TestTaskAttempt{
|
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testTooManyFetchFailureWhileContainerCleanup() {
|
|
|
|
+ MockEventHandler eventHandler = new MockEventHandler();
|
|
|
|
+ TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
|
|
|
|
+ TaskId reducetaskId = MRBuilderUtils.newTaskId(taImpl.getID().getTaskId()
|
|
|
|
+ .getJobId(), 1, TaskType.REDUCE);
|
|
|
|
+ TaskAttemptId reduceTAId =
|
|
|
|
+ MRBuilderUtils.newTaskAttemptId(reducetaskId, 0);
|
|
|
|
+
|
|
|
|
+ // move in two steps to the desired state (cannot get there directly)
|
|
|
|
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
|
+ assertEquals("Task attempt's internal state is not " +
|
|
|
|
+ "SUCCESS_FINISHING_CONTAINER",
|
|
|
|
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
|
+ taImpl.getInternalState());
|
|
|
|
+
|
|
|
|
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
|
+ TaskAttemptEventType.TA_TIMED_OUT));
|
|
|
|
+ assertEquals("Task attempt's internal state is not " +
|
|
|
|
+ "SUCCESS_CONTAINER_CLEANUP",
|
|
|
|
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
|
+ taImpl.getInternalState());
|
|
|
|
+
|
|
|
|
+ taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(taImpl.getID(),
|
|
|
|
+ reduceTAId, "Host"));
|
|
|
|
+ assertEquals("Task attempt is not in FAILED state",
|
|
|
|
+ TaskAttemptState.FAILED,
|
|
|
|
+ taImpl.getState());
|
|
|
|
+ assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
|
+ }
|
|
|
|
+
|
|
private void initResourceTypes() {
|
|
private void initResourceTypes() {
|
|
Configuration conf = new Configuration();
|
|
Configuration conf = new Configuration();
|
|
conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
|
conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|