浏览代码

MAPREDUCE-7240. Fix Invalid event: TA_TOO_MANY_FETCH_FAILURE at SUCCESS_FINISHING_CONTAINER.

Contributed by Huachao and Peter Bacsko. Reviewed by Wilfred Spiegelenburg.
prabhujoseph 5 年之前
父节点
当前提交
66e58b5525

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -382,6 +382,10 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.FAILED,
+         TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
+         new TooManyFetchFailureTransition())
      // ignore-able events
      .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
          TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
@@ -2148,6 +2152,10 @@ public abstract class TaskAttemptImpl implements
     @SuppressWarnings("unchecked")
     @Override
     public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+      if (taskAttempt.getInternalState() ==
+          TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER) {
+        sendContainerCleanup(taskAttempt, event);
+      }
       TaskAttemptTooManyFetchFailureEvent fetchFailureEvent =
           (TaskAttemptTooManyFetchFailureEvent) event;
       // too many fetch failure can only happen for map tasks

+ 25 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -1782,6 +1782,31 @@ public class TestTaskAttempt{
     ResourceUtils.resetResourceTypes(conf);
   }
 
+  @Test
+  public void testTooManyFetchFailureWhileSuccessFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+    TaskId reducetaskId = MRBuilderUtils.newTaskId(taImpl.getID().getTaskId()
+        .getJobId(), 1, TaskType.REDUCE);
+    TaskAttemptId reduceTAId =
+        MRBuilderUtils.newTaskAttemptId(reducetaskId, 0);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER",
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+        taImpl.getInternalState());
+
+    taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(taImpl.getID(),
+        reduceTAId, "Host"));
+    assertEquals("Task attempt is not in FAILED state",
+        TaskAttemptState.FAILED,
+        taImpl.getState());
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
   private void setupTaskAttemptFinishingMonitor(
       EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
     TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =