|
@@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
@@ -2848,4 +2849,42 @@ public class TestCapacityScheduler {
|
|
|
+ CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
|
|
|
conf.setInt(propName, maxAllocVcores);
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSchedulingOnRemovedNode() throws Exception {
|
|
|
+ Configuration conf = new YarnConfiguration();
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ conf.setBoolean(
|
|
|
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
|
|
|
+ false);
|
|
|
+
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+ RMApp app = rm.submitApp(100);
|
|
|
+ rm.drainEvents();
|
|
|
+
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
|
|
|
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
|
|
+
|
|
|
+ //remove nm2 to keep am alive
|
|
|
+ MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
|
|
|
+
|
|
|
+ am.allocate(ResourceRequest.ANY, 2048, 1, null);
|
|
|
+
|
|
|
+ CapacityScheduler scheduler =
|
|
|
+ (CapacityScheduler) rm.getRMContext().getScheduler();
|
|
|
+ FiCaSchedulerNode node = scheduler.getAllNodes().get(nm2.getNodeId());
|
|
|
+ scheduler.handle(new NodeRemovedSchedulerEvent(
|
|
|
+ rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
|
|
|
+ // schedulerNode is removed, try allocate a container
|
|
|
+ scheduler.allocateContainersToNode(node);
|
|
|
+
|
|
|
+ AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
|
|
+ new AppAttemptRemovedSchedulerEvent(
|
|
|
+ am.getApplicationAttemptId(),
|
|
|
+ RMAppAttemptState.FINISHED, false);
|
|
|
+ scheduler.handle(appRemovedEvent1);
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
}
|