فهرست منبع

MAPREDUCE-7166. map-only job should ignore node lost event when task is already succeeded. Contributed by Lei Li.

Akira Ajisaka 6 سال پیش
والد
کامیت
499c70eda5

+ 15 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -2196,6 +2196,14 @@ public abstract class TaskAttemptImpl implements
                   taskAttempt.getID().toString());
         return TaskAttemptStateInternal.SUCCEEDED;
       }
+      if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP
+          && taskAttempt.conf.getNumReduceTasks() == 0) {
+        // same reason as above for map only job after map task has succeeded.
+        // ignore this for map only tasks
+        LOG.info("Ignoring killed event for successful map only task attempt" +
+            taskAttempt.getID().toString());
+        return TaskAttemptStateInternal.SUCCEEDED;
+      }
       if(event instanceof TaskAttemptKillEvent) {
         TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
         //add to diagnostic
@@ -2246,6 +2254,13 @@ public abstract class TaskAttemptImpl implements
         LOG.info("Ignoring killed event for successful reduce task attempt" +
             taskAttempt.getID().toString());
         return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
+      } else if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP
+          && taskAttempt.conf.getNumReduceTasks() == 0) {
+        // same reason as above for map only job after map task has succeeded.
+        // ignore this for map only tasks
+        LOG.info("Ignoring killed event for successful map only task attempt" +
+            taskAttempt.getID().toString());
+        return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
       } else {
         // Store reschedule flag so that after clean up is completed, new
         // attempt is scheduled/rescheduled based on it.

+ 80 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -1322,6 +1322,42 @@ public class TestTaskAttempt{
     assertFalse("InternalError occurred", eventHandler.internalError);
   }
 
+  @Test
+  public void testKillMapOnlyTaskWhileSuccessFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createMapOnlyTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not " +
+            "SUCCESS_FINISHING_CONTAINER",
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+        taImpl.getInternalState());
+
+    // If the map only task is killed when it is in SUCCESS_FINISHING_CONTAINER
+    // state, the state will move to SUCCESS_CONTAINER_CLEANUP
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not " +
+            "SUCCESS_CONTAINER_CLEANUP",
+        TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not SUCCEEDED state",
+        TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState());
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
   @Test
   public void testKillMapTaskAfterSuccess() throws Exception {
     MockEventHandler eventHandler = new MockEventHandler();
@@ -1340,7 +1376,7 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_CONTAINER_CLEANED));
     // Send a map task attempt kill event indicating next map attempt has to be
     // reschedule
-    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
+    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(), "", true));
     assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
         TaskAttemptState.KILLED);
     assertEquals("Task attempt's internal state is not KILLED",
@@ -1353,6 +1389,34 @@ public class TestTaskAttempt{
     assertTrue(((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
   }
 
+  @Test
+  public void testKillMapOnlyTaskAfterSuccess() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createMapOnlyTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not " +
+            "SUCCESS_FINISHING_CONTAINER",
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+        taImpl.getInternalState());
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    // Succeeded
+    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not SUCCEEDED",
+        TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState());
+    assertFalse("InternalError occurred", eventHandler.internalError);
+    TaskEvent event = eventHandler.lastTaskEvent;
+    assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, event.getType());
+  }
+
   @Test
   public void testKillMapTaskWhileFailFinishing() throws Exception {
     MockEventHandler eventHandler = new MockEventHandler();
@@ -1765,8 +1829,8 @@ public class TestTaskAttempt{
         thenReturn(taskAttemptFinishingMonitor);
   }
 
-  private TaskAttemptImpl createTaskAttemptImpl(
-      MockEventHandler eventHandler) {
+  private TaskAttemptImpl createCommonTaskAttemptImpl(
+      MockEventHandler eventHandler, JobConf jobConf) {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 0);
@@ -1778,7 +1842,6 @@ public class TestTaskAttempt{
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
 
-    JobConf jobConf = new JobConf();
     jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     jobConf.setBoolean("fs.file.impl.disable.cache", true);
     jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
@@ -1813,6 +1876,19 @@ public class TestTaskAttempt{
     return taImpl;
   }
 
+  private TaskAttemptImpl createTaskAttemptImpl(
+      MockEventHandler eventHandler) {
+    JobConf jobConf = new JobConf();
+    return createCommonTaskAttemptImpl(eventHandler, jobConf);
+  }
+
+  private TaskAttemptImpl createMapOnlyTaskAttemptImpl(
+      MockEventHandler eventHandler) {
+    JobConf jobConf = new JobConf();
+    jobConf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    return createCommonTaskAttemptImpl(eventHandler, jobConf);
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
     public TaskEvent lastTaskEvent;