瀏覽代碼

MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1408411 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 12 年之前
父節點
當前提交
a1e0568d79

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -90,6 +90,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
 
     MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby)
+
+    MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe 
+    via bobby)
  
 Release 0.23.4 - UNRELEASED
 

+ 17 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -212,15 +212,17 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
+    .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+        EnumSet.of(TaskEventType.T_ATTEMPT_KILLED,
+            TaskEventType.T_ATTEMPT_SUCCEEDED),
+            new AttemptCompletedAtSucceededTransition())
     // Ignore-able transitions.
     .addTransition(
         TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
         EnumSet.of(TaskEventType.T_KILL,
             TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
-            TaskEventType.T_ATTEMPT_LAUNCHED,
-            TaskEventType.T_ATTEMPT_KILLED,
-            TaskEventType.T_ATTEMPT_SUCCEEDED))
+            TaskEventType.T_ATTEMPT_LAUNCHED))
 
     // Transitions from FAILED state        
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
@@ -964,6 +966,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
           !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
         // don't allow a different task attempt to override a previous
         // succeeded state
+        task.finishedAttempts.add(castEvent.getTaskAttemptID());
+        task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
         return TaskStateInternal.SUCCEEDED;
       }
       
@@ -993,6 +997,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  private static class AttemptCompletedAtSucceededTransition
+    implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      task.finishedAttempts.add(castEvent.getTaskAttemptID());
+      task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
+    }
+  }
+
   private static class KillNewTransition 
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override

+ 43 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -140,7 +140,6 @@ public class TestTaskImpl {
 
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
-    private TaskAttemptId attemptId;
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
@@ -150,13 +149,10 @@ public class TestTaskImpl {
         AppContext appContext) {
       super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
           dataLocations, committer, jobToken, credentials, clock, appContext);
-      attemptId = Records.newRecord(TaskAttemptId.class);
-      attemptId.setId(id);
-      attemptId.setTaskId(taskId);
     }
 
     public TaskAttemptId getAttemptId() {
-      return attemptId;
+      return getID();
     }
     
     @Override
@@ -522,4 +518,46 @@ public class TestTaskImpl {
   public void testCommitAfterSucceeds() {
     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
   }
+
+  @Test
+  public void testSpeculativeMapFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt killed
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
+
+  @Test
+  public void testSpeculativeMapMultipleSucceedFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt succeeds
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
+
+  @Test
+  public void testSpeculativeMapFailedFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt succeeds
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
 }