|
@@ -531,7 +531,7 @@ public class TestJobImpl {
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
conf.setInt(MRJobConfig.NUM_REDUCES, 1);
|
|
|
- AsyncDispatcher dispatcher = new AsyncDispatcher();
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
dispatcher.init(conf);
|
|
|
dispatcher.start();
|
|
|
CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
|
@@ -608,6 +608,7 @@ public class TestJobImpl {
|
|
|
NodeReport secondMapperNodeReport = nodeReports.get(1);
|
|
|
job.handle(new JobUpdatedNodesEvent(job.getID(),
|
|
|
Collections.singletonList(firstMapperNodeReport)));
|
|
|
+ dispatcher.await();
|
|
|
// complete the reducer
|
|
|
for (TaskId taskId: job.tasks.keySet()) {
|
|
|
if (taskId.getTaskType() == TaskType.REDUCE) {
|