|
@@ -1409,7 +1409,63 @@ public class TestRMContainerAllocator {
|
|
|
maxReduceRampupLimit, reduceSlowStart);
|
|
|
verify(allocator).rampDownReduces(anyInt());
|
|
|
}
|
|
|
+
|
|
|
+ private static class RecalculateContainerAllocator extends MyContainerAllocator {
|
|
|
+ public boolean recalculatedReduceSchedule = false;
|
|
|
+
|
|
|
+ public RecalculateContainerAllocator(MyResourceManager rm,
|
|
|
+ Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
|
|
|
+ super(rm, conf, appAttemptId, job);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void scheduleReduces(int totalMaps, int completedMaps,
|
|
|
+ int scheduledMaps, int scheduledReduces, int assignedMaps,
|
|
|
+ int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
|
|
|
+ int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
|
|
|
+ recalculatedReduceSchedule = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testCompletedTasksRecalculateSchedule() throws Exception {
|
|
|
+ LOG.info("Running testCompletedTasksRecalculateSchedule");
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ final MyResourceManager rm = new MyResourceManager(conf);
|
|
|
+ rm.start();
|
|
|
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
|
|
|
+ .getDispatcher();
|
|
|
+
|
|
|
+ // Submit the application
|
|
|
+ RMApp app = rm.submitApp(1024);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
|
|
+ .getAppAttemptId();
|
|
|
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
|
|
+ Job job = mock(Job.class);
|
|
|
+ when(job.getReport()).thenReturn(
|
|
|
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
|
|
|
+ doReturn(10).when(job).getTotalMaps();
|
|
|
+ doReturn(10).when(job).getTotalReduces();
|
|
|
+ doReturn(0).when(job).getCompletedMaps();
|
|
|
+ RecalculateContainerAllocator allocator =
|
|
|
+ new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
|
|
|
+ allocator.schedule();
|
|
|
+
|
|
|
+ allocator.recalculatedReduceSchedule = false;
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertFalse("Unexpected recalculate of reduce schedule",
|
|
|
+ allocator.recalculatedReduceSchedule);
|
|
|
+
|
|
|
+ doReturn(1).when(job).getCompletedMaps();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertTrue("Expected recalculate of reduce schedule",
|
|
|
+ allocator.recalculatedReduceSchedule);
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
|
|
t.testSimple();
|
|
@@ -1418,6 +1474,7 @@ public class TestRMContainerAllocator {
|
|
|
t.testReportedAppProgress();
|
|
|
t.testReportedAppProgressWithOnlyMaps();
|
|
|
t.testBlackListedNodes();
|
|
|
+ t.testCompletedTasksRecalculateSchedule();
|
|
|
}
|
|
|
|
|
|
}
|