Przeglądaj źródła

MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear the value from the Task commitAttempt member (Robert Parker via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1447965 13f79535-47bb-0310-9956-ffa450edef68
Jonathan Turner Eagles 12 lat temu
rodzic
commit
c8f35bc3d2

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -723,6 +723,10 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with
     MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with
     speculative attempts (Robert Parker via jlowe)
     speculative attempts (Robert Parker via jlowe)
 
 
+    MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear
+    the value from the Task commitAttempt member (Robert Parker via jeagles)
+
+
 Release 0.23.6 - UNRELEASED
 Release 0.23.6 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -857,6 +857,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       if (task.successfulAttempt == null) {
       if (task.successfulAttempt == null) {
         task.addAndScheduleAttempt(Avataar.VIRGIN);
         task.addAndScheduleAttempt(Avataar.VIRGIN);
       }
       }
+      if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) {
+    	task.commitAttempt = null;
+      }
     }
     }
   }
   }
 
 

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -491,7 +491,25 @@ public class TestTaskImpl {
     assert(mockTask.getProgress() == progress);
     assert(mockTask.getProgress() == progress);
         
         
   }
   }
+
   
   
+  @Test
+  public void testKillDuringTaskAttemptCommit() {
+    mockTask = createMockTask(TaskType.REDUCE);        
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+
+    TaskAttemptId commitAttempt = getLastAttempt().getAttemptId();
+    updateLastAttemptState(TaskAttemptState.KILLED);
+    killRunningTaskAttempt(commitAttempt);
+
+    assertFalse(mockTask.canCommit(commitAttempt));
+  }
+
   @Test
   @Test
   public void testFailureDuringTaskAttemptCommit() {
   public void testFailureDuringTaskAttemptCommit() {
     mockTask = createMockTask(TaskType.MAP);        
     mockTask = createMockTask(TaskType.MAP);