|
@@ -203,7 +203,7 @@ public class TestJobImpl {
|
|
public void testCheckJobCompleteSuccess() throws Exception {
|
|
public void testCheckJobCompleteSuccess() throws Exception {
|
|
Configuration conf = new Configuration();
|
|
Configuration conf = new Configuration();
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
- AsyncDispatcher dispatcher = new AsyncDispatcher();
|
|
|
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
dispatcher.init(conf);
|
|
dispatcher.init(conf);
|
|
dispatcher.start();
|
|
dispatcher.start();
|
|
CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
|
CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
|
@@ -225,6 +225,11 @@ public class TestJobImpl {
|
|
JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
|
JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
|
|
|
|
|
|
+ job.handle(new JobEvent(job.getID(),
|
|
|
|
+ JobEventType.JOB_TASK_COMPLETED));
|
|
|
|
+ dispatcher.await();
|
|
|
|
+ assertJobState(job, JobStateInternal.COMMITTING);
|
|
|
|
+
|
|
// let the committer complete and verify the job succeeds
|
|
// let the committer complete and verify the job succeeds
|
|
syncBarrier.await();
|
|
syncBarrier.await();
|
|
assertJobState(job, JobStateInternal.SUCCEEDED);
|
|
assertJobState(job, JobStateInternal.SUCCEEDED);
|
|
@@ -236,6 +241,11 @@ public class TestJobImpl {
|
|
job.handle(new JobEvent(job.getID(),
|
|
job.handle(new JobEvent(job.getID(),
|
|
JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
|
JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
|
assertJobState(job, JobStateInternal.SUCCEEDED);
|
|
assertJobState(job, JobStateInternal.SUCCEEDED);
|
|
|
|
+
|
|
|
|
+ job.handle(new JobEvent(job.getID(),
|
|
|
|
+ JobEventType.JOB_TASK_COMPLETED));
|
|
|
|
+ dispatcher.await();
|
|
|
|
+ assertJobState(job, JobStateInternal.SUCCEEDED);
|
|
|
|
|
|
dispatcher.stop();
|
|
dispatcher.stop();
|
|
commitHandler.stop();
|
|
commitHandler.stop();
|