|
@@ -3875,4 +3875,42 @@ public class TestCapacityScheduler {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSchedulingOnRemovedNode() throws Exception {
|
|
|
+ Configuration conf = new YarnConfiguration();
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ conf.setBoolean(
|
|
|
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
|
|
|
+ false);
|
|
|
+
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+ RMApp app = rm.submitApp(100);
|
|
|
+ rm.drainEvents();
|
|
|
+
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
|
|
|
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
|
|
+
|
|
|
+ //remove nm2 to keep am alive
|
|
|
+ MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
|
|
|
+
|
|
|
+ am.allocate(ResourceRequest.ANY, 2048, 1, null);
|
|
|
+
|
|
|
+ CapacityScheduler scheduler =
|
|
|
+ (CapacityScheduler) rm.getRMContext().getScheduler();
|
|
|
+ FiCaSchedulerNode node = scheduler.getAllNodes().get(nm2.getNodeId());
|
|
|
+ scheduler.handle(new NodeRemovedSchedulerEvent(
|
|
|
+ rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
|
|
|
+ // schedulerNode is removed, try allocate a container
|
|
|
+ scheduler.allocateContainersToNode(node);
|
|
|
+
|
|
|
+ AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
|
|
+ new AppAttemptRemovedSchedulerEvent(
|
|
|
+ am.getApplicationAttemptId(),
|
|
|
+ RMAppAttemptState.FINISHED, false);
|
|
|
+ scheduler.handle(appRemovedEvent1);
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
}
|