|
@@ -27,9 +27,16 @@ import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.service.Service;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.*;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
@@ -416,6 +423,103 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 30000l)
|
|
|
+ public void testContainerReleaseWithAllocationTags() throws Exception {
|
|
|
+ // Currently only can be tested against capacity scheduler.
|
|
|
+ if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
|
|
|
+ final String testTag1 = "some-tag";
|
|
|
+ final String testTag2 = "some-other-tag";
|
|
|
+ YarnConfiguration conf = getConf();
|
|
|
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler");
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = new MockNM("127.0.0.1:1234",
|
|
|
+ 10240, rm1.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+ RMApp app1 =
|
|
|
+ rm1.submitApp(200, "name", "user", new HashMap<>(), false, "default",
|
|
|
+ -1, null, "Test", false, true);
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // allocate 1 container with tag1
|
|
|
+ SchedulingRequest sr = SchedulingRequest
|
|
|
+ .newInstance(1l, Priority.newInstance(1),
|
|
|
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
|
|
|
+ Sets.newHashSet(testTag1),
|
|
|
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
|
|
|
+ null);
|
|
|
+
|
|
|
+ // allocate 3 containers with tag2
|
|
|
+ SchedulingRequest sr1 = SchedulingRequest
|
|
|
+ .newInstance(2l, Priority.newInstance(1),
|
|
|
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
|
|
|
+ Sets.newHashSet(testTag2),
|
|
|
+ ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
|
|
|
+ null);
|
|
|
+
|
|
|
+ AllocateRequest ar = AllocateRequest.newBuilder()
|
|
|
+ .schedulingRequests(Lists.newArrayList(sr, sr1)).build();
|
|
|
+ am1.allocate(ar);
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+
|
|
|
+ List<Container> allocated = new ArrayList<>();
|
|
|
+ while (allocated.size() < 4) {
|
|
|
+ AllocateResponse rsp = am1
|
|
|
+ .allocate(new ArrayList<>(), new ArrayList<>());
|
|
|
+ allocated.addAll(rsp.getAllocatedContainers());
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals(4, allocated.size());
|
|
|
+
|
|
|
+ Set<Container> containers = allocated.stream()
|
|
|
+ .filter(container -> container.getAllocationRequestId() == 1l)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ Assert.assertNotNull(containers);
|
|
|
+ Assert.assertEquals(1, containers.size());
|
|
|
+ ContainerId cid = containers.iterator().next().getId();
|
|
|
+
|
|
|
+ // mock container start
|
|
|
+ rm1.getRMContext().getScheduler()
|
|
|
+ .getSchedulerNode(nm1.getNodeId()).containerStarted(cid);
|
|
|
+
|
|
|
+ // verifies the allocation is made with correct number of tags
|
|
|
+ Map<String, Long> nodeTags = rm1.getRMContext()
|
|
|
+ .getAllocationTagsManager()
|
|
|
+ .getAllocationTagsWithCount(nm1.getNodeId());
|
|
|
+ Assert.assertNotNull(nodeTags.get(testTag1));
|
|
|
+ Assert.assertEquals(1, nodeTags.get(testTag1).intValue());
|
|
|
+
|
|
|
+ // release a container
|
|
|
+ am1.allocate(new ArrayList<>(), Lists.newArrayList(cid));
|
|
|
+
|
|
|
+ // before NM confirms, the tag should still exist
|
|
|
+ nodeTags = rm1.getRMContext().getAllocationTagsManager()
|
|
|
+ .getAllocationTagsWithCount(nm1.getNodeId());
|
|
|
+ Assert.assertNotNull(nodeTags);
|
|
|
+ Assert.assertNotNull(nodeTags.get(testTag1));
|
|
|
+ Assert.assertEquals(1, nodeTags.get(testTag1).intValue());
|
|
|
+
|
|
|
+ // NM reports back that container is released
|
|
|
+ // RM should cleanup the tag
|
|
|
+ ContainerStatus cs = ContainerStatus.newInstance(cid,
|
|
|
+ ContainerState.COMPLETE, "", 0);
|
|
|
+ nm1.nodeHeartbeat(Lists.newArrayList(cs), true);
|
|
|
+
|
|
|
+ // Wait on condition
|
|
|
+ // 1) tag1 doesn't exist anymore
|
|
|
+ // 2) num of tag2 is still 3
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ Map<String, Long> tags = rm1.getRMContext()
|
|
|
+ .getAllocationTagsManager()
|
|
|
+ .getAllocationTagsWithCount(nm1.getNodeId());
|
|
|
+ return tags.get(testTag1) == null &&
|
|
|
+ tags.get(testTag2).intValue() == 3;
|
|
|
+ }, 500, 3000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout=60000)
|
|
|
public void testContainerReleasedByNode() throws Exception {
|
|
|
System.out.println("Starting testContainerReleasedByNode");
|