浏览代码

MAPREDUCE-6711. JobImpl fails to handle preemption events on state COMMITTING. Contributed by Prabhu Joseph.

(cherry picked from commit 679478d0c643defb3759a2ecd3c704315edee124)
(cherry picked from commit bf87c523ab5ffd1d5a94e74639be5de7821a6c5b)
Junping Du 8 年之前
父节点
当前提交
031ad740b9

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -418,7 +418,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobStateInternal.COMMITTING,
               JobStateInternal.COMMITTING,
               EnumSet.of(JobEventType.JOB_UPDATED_NODES,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED))
 
           // Transitions from SUCCEEDED state
           .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -218,6 +218,14 @@ public class TestJobImpl {
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
+    job.handle(new JobEvent(job.getID(),
+        JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    job.handle(new JobEvent(job.getID(),
+        JobEventType.JOB_MAP_TASK_RESCHEDULED));
+    assertJobState(job, JobStateInternal.COMMITTING);
+
     // let the committer complete and verify the job succeeds
     syncBarrier.await();
     assertJobState(job, JobStateInternal.SUCCEEDED);