Просмотр исходного кода

svn merge -c 1359747. FIXES: MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1359750 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 13 лет назад
Родитель
Сommit
576de5f2a7

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -310,6 +310,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-3728. ShuffleHandler can't access results when configured in a
     secure mode (ahmed via tucu)
 
+    MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via
+    bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 12 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -189,7 +189,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
     // Transitions from SUCCEEDED state
     .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
-        EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
+        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
     // Ignore-able transitions.
     .addTransition(
@@ -617,7 +617,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  private void internalError(TaskEventType type) {
+  protected void internalError(TaskEventType type) {
     LOG.error("Invalid event " + type + " on Task " + this.taskId);
     eventHandler.handle(new JobDiagnosticsUpdateEvent(
         this.taskId.getJobId(), "Invalid event " + type + 
@@ -893,6 +893,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
     @Override
     public TaskState transition(TaskImpl task, TaskEvent event) {
+      if (event instanceof TaskTAttemptEvent) {
+        TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+        if (task.getState() == TaskState.SUCCEEDED &&
+            !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+          // don't allow a different task attempt to override a previous
+          // succeeded state
+          return TaskState.SUCCEEDED;
+        }
+      }
+      
       //verify that this occurs only for map task
       //TODO: consider moving it to MapTaskImpl
       if (!TaskType.MAP.equals(task.getType())) {

+ 35 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -127,8 +128,13 @@ public class TestTaskImpl {
     @Override
     protected int getMaxAttempts() {
       return 100;
-    }    
-    
+    }
+
+    @Override
+    protected void internalError(TaskEventType type) {
+      super.internalError(type);
+      fail("Internal error: " + type);
+    }
   }
   
   private class MockTaskAttemptImpl extends TaskAttemptImpl {
@@ -462,5 +468,32 @@ public class TestTaskImpl {
 
     assertTaskSucceededState();
   }
+  
+  @Test
+  public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt that succeeds
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    
+    // The task should now have succeeded
+    assertTaskSucceededState();
+    
+    // Now fail the first task attempt, after the second has succeeded
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), 
+        TaskEventType.T_ATTEMPT_FAILED));
+    
+    // The task should still be in the succeeded state
+    assertTaskSucceededState();
+    
+  }
 
 }