|
@@ -537,6 +537,92 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ @Test(timeout = 30000L)
|
|
|
+ public void testNodeRemovedWithAllocationTags() throws Exception {
|
|
|
+ // Currently only can be tested against capacity scheduler.
|
|
|
+ if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
|
|
|
+ final String testTag1 = "some-tag";
|
|
|
+ YarnConfiguration conf = getConf();
|
|
|
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler");
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = new MockNM("127.0.0.1:1234",
|
|
|
+ 10240, rm1.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+ MockRMAppSubmissionData data =
|
|
|
+ MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
|
|
|
+ .withAppName("name")
|
|
|
+ .withUser("user")
|
|
|
+ .withAcls(new HashMap<>())
|
|
|
+ .withUnmanagedAM(false)
|
|
|
+ .withQueue("default")
|
|
|
+ .withMaxAppAttempts(-1)
|
|
|
+ .withCredentials(null)
|
|
|
+ .withAppType("Test")
|
|
|
+ .withWaitForAppAcceptedState(false)
|
|
|
+ .withKeepContainers(true)
|
|
|
+ .build();
|
|
|
+ RMApp app1 =
|
|
|
+ MockRMAppSubmitter.submit(rm1, data);
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // allocate 1 container with tag1
|
|
|
+ SchedulingRequest sr = SchedulingRequest
|
|
|
+ .newInstance(1L, Priority.newInstance(1),
|
|
|
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
|
|
|
+ Sets.newHashSet(testTag1),
|
|
|
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
|
|
|
+ null);
|
|
|
+
|
|
|
+ AllocateRequest ar = AllocateRequest.newBuilder()
|
|
|
+ .schedulingRequests(Lists.newArrayList(sr)).build();
|
|
|
+ am1.allocate(ar);
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+
|
|
|
+ List<Container> allocated = new ArrayList<>();
|
|
|
+ while (allocated.size() < 1) {
|
|
|
+ AllocateResponse rsp = am1
|
|
|
+ .allocate(new ArrayList<>(), new ArrayList<>());
|
|
|
+ allocated.addAll(rsp.getAllocatedContainers());
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals(1, allocated.size());
|
|
|
+
|
|
|
+ Set<Container> containers = allocated.stream()
|
|
|
+ .filter(container -> container.getAllocationRequestId() == 1L)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ Assert.assertNotNull(containers);
|
|
|
+ Assert.assertEquals(1, containers.size());
|
|
|
+ ContainerId cid = containers.iterator().next().getId();
|
|
|
+
|
|
|
+ // mock container start
|
|
|
+ rm1.getRMContext().getScheduler()
|
|
|
+ .getSchedulerNode(nm1.getNodeId()).containerStarted(cid);
|
|
|
+
|
|
|
+ // verifies the allocation is made with correct number of tags
|
|
|
+ Map<String, Long> nodeTags = rm1.getRMContext()
|
|
|
+ .getAllocationTagsManager()
|
|
|
+ .getAllocationTagsWithCount(nm1.getNodeId());
|
|
|
+ Assert.assertNotNull(nodeTags.get(testTag1));
|
|
|
+ Assert.assertEquals(1, nodeTags.get(testTag1).intValue());
|
|
|
+
|
|
|
+ // remove the node
|
|
|
+ RMNode node1 = MockNodes.newNodeInfo(
|
|
|
+ 0, Resources.createResource(nm1.getMemory()), 1, "127.0.0.1", 1234);
|
|
|
+ rm1.getRMContext().getScheduler().handle(
|
|
|
+ new NodeRemovedSchedulerEvent(node1));
|
|
|
+
|
|
|
+ // Once the node is removed, the tag should be removed immediately
|
|
|
+ nodeTags = rm1.getRMContext().getAllocationTagsManager()
|
|
|
+ .getAllocationTagsWithCount(nm1.getNodeId());
|
|
|
+ Assert.assertNull(nodeTags);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
@Test(timeout=60000)
|
|
|
public void testContainerReleasedByNode() throws Exception {
|
|
|
System.out.println("Starting testContainerReleasedByNode");
|