|
@@ -204,7 +204,7 @@ public class TestJobImpl {
|
|
|
public void testCheckJobCompleteSuccess() throws Exception {
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
- AsyncDispatcher dispatcher = new AsyncDispatcher();
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
dispatcher.init(conf);
|
|
|
dispatcher.start();
|
|
|
CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
|
@@ -226,6 +226,11 @@ public class TestJobImpl {
|
|
|
JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
|
|
|
|
+ job.handle(new JobEvent(job.getID(),
|
|
|
+ JobEventType.JOB_TASK_COMPLETED));
|
|
|
+ dispatcher.await();
|
|
|
+ assertJobState(job, JobStateInternal.COMMITTING);
|
|
|
+
|
|
|
// let the committer complete and verify the job succeeds
|
|
|
syncBarrier.await();
|
|
|
assertJobState(job, JobStateInternal.SUCCEEDED);
|
|
@@ -237,6 +242,11 @@ public class TestJobImpl {
|
|
|
job.handle(new JobEvent(job.getID(),
|
|
|
JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
|
|
assertJobState(job, JobStateInternal.SUCCEEDED);
|
|
|
+
|
|
|
+ job.handle(new JobEvent(job.getID(),
|
|
|
+ JobEventType.JOB_TASK_COMPLETED));
|
|
|
+ dispatcher.await();
|
|
|
+ assertJobState(job, JobStateInternal.SUCCEEDED);
|
|
|
|
|
|
dispatcher.stop();
|
|
|
commitHandler.stop();
|