Quellcode durchsuchen

MAPREDUCE-6711. JobImpl fails to handle preemption events on state COMMITTING. Contributed by Prabhu Joseph.

(cherry picked from commit 679478d0c643defb3759a2ecd3c704315edee124)
(cherry picked from commit bf87c523ab5ffd1d5a94e74639be5de7821a6c5b)
(cherry picked from commit 031ad740b934d5fe67495bec34237aad3cc83815)
Junping Du vor 8 Jahren
Ursprung
Commit
01dff41699

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -418,7 +418,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobStateInternal.COMMITTING,
               JobStateInternal.COMMITTING,
               EnumSet.of(JobEventType.JOB_UPDATED_NODES,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED))
 
           // Transitions from SUCCEEDED state
           .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -216,6 +216,14 @@ public class TestJobImpl {
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
+    job.handle(new JobEvent(job.getID(),
+        JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    job.handle(new JobEvent(job.getID(),
+        JobEventType.JOB_MAP_TASK_RESCHEDULED));
+    assertJobState(job, JobStateInternal.COMMITTING);
+
     // let the committer complete and verify the job succeeds
     syncBarrier.await();
     assertJobState(job, JobStateInternal.SUCCEEDED);