|
@@ -51,8 +51,6 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
|
|
public static final String USER = "user_";
|
|
|
public static final String PARENT_QUEUE = "c";
|
|
|
|
|
|
- private MockRM mockRM = null;
|
|
|
-
|
|
|
public static CapacitySchedulerConfiguration setupQueueMappingsForRules(
|
|
|
CapacitySchedulerConfiguration conf, String parentQueue,
|
|
|
boolean overrideWithQueueMappings, int[] sourceIds) {
|
|
@@ -114,23 +112,30 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
|
|
// init queue mapping for UserGroupMappingRule and AppNameMappingRule
|
|
|
setupQueueMappingsForRules(conf, PARENT_QUEUE, true, new int[] {1, 2, 3});
|
|
|
|
|
|
- mockRM = new MockRM(conf);
|
|
|
- CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
- cs.updatePlacementRules();
|
|
|
- mockRM.start();
|
|
|
- cs.start();
|
|
|
-
|
|
|
- List<PlacementRule> rules = cs.getRMContext()
|
|
|
- .getQueuePlacementManager().getPlacementRules();
|
|
|
-
|
|
|
- List<String> placementRuleNames = new ArrayList<>();
|
|
|
- for (PlacementRule pr : rules) {
|
|
|
- placementRuleNames.add(pr.getName());
|
|
|
+ MockRM mockRM = null;
|
|
|
+ try {
|
|
|
+ mockRM = new MockRM(conf);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
+ cs.updatePlacementRules();
|
|
|
+ mockRM.start();
|
|
|
+ cs.start();
|
|
|
+
|
|
|
+ List<PlacementRule> rules = cs.getRMContext()
|
|
|
+ .getQueuePlacementManager().getPlacementRules();
|
|
|
+
|
|
|
+ List<String> placementRuleNames = new ArrayList<>();
|
|
|
+ for (PlacementRule pr : rules) {
|
|
|
+ placementRuleNames.add(pr.getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ // verify both placement rules were added successfully
|
|
|
+ assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP));
|
|
|
+ assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME));
|
|
|
+ } finally {
|
|
|
+ if(mockRM != null) {
|
|
|
+ mockRM.close();
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- // verify both placement rules were added successfully
|
|
|
- assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP));
|
|
|
- assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -173,28 +178,35 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
|
|
// override with queue mappings
|
|
|
conf.setOverrideWithQueueMappings(true);
|
|
|
|
|
|
- mockRM = new MockRM(conf);
|
|
|
- CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
- cs.updatePlacementRules();
|
|
|
- mockRM.start();
|
|
|
- cs.start();
|
|
|
-
|
|
|
- ApplicationSubmissionContext asc =
|
|
|
- Records.newRecord(ApplicationSubmissionContext.class);
|
|
|
- asc.setQueue("default");
|
|
|
-
|
|
|
- List<PlacementRule> rules =
|
|
|
- cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
|
|
-
|
|
|
- UserGroupMappingPlacementRule r =
|
|
|
- (UserGroupMappingPlacementRule) rules.get(0);
|
|
|
-
|
|
|
- ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
|
|
|
- assertEquals("Queue", "b1", ctx.getQueue());
|
|
|
-
|
|
|
- ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "user2");
|
|
|
- assertEquals("Queue", "user2", ctx2.getQueue());
|
|
|
- assertEquals("Queue", "c", ctx2.getParentQueue());
|
|
|
+ MockRM mockRM = null;
|
|
|
+ try {
|
|
|
+ mockRM = new MockRM(conf);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
+ cs.updatePlacementRules();
|
|
|
+ mockRM.start();
|
|
|
+ cs.start();
|
|
|
+
|
|
|
+ ApplicationSubmissionContext asc =
|
|
|
+ Records.newRecord(ApplicationSubmissionContext.class);
|
|
|
+ asc.setQueue("default");
|
|
|
+
|
|
|
+ List<PlacementRule> rules =
|
|
|
+ cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
|
|
+
|
|
|
+ UserGroupMappingPlacementRule r =
|
|
|
+ (UserGroupMappingPlacementRule) rules.get(0);
|
|
|
+
|
|
|
+ ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
|
|
|
+ assertEquals("Queue", "b1", ctx.getQueue());
|
|
|
+
|
|
|
+ ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "user2");
|
|
|
+ assertEquals("Queue", "user2", ctx2.getQueue());
|
|
|
+ assertEquals("Queue", "c", ctx2.getParentQueue());
|
|
|
+ } finally {
|
|
|
+ if(mockRM != null) {
|
|
|
+ mockRM.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -303,29 +315,105 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
|
|
// override with queue mappings
|
|
|
conf.setOverrideWithQueueMappings(true);
|
|
|
|
|
|
- mockRM = new MockRM(conf);
|
|
|
- CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
- cs.updatePlacementRules();
|
|
|
- mockRM.start();
|
|
|
- cs.start();
|
|
|
+ MockRM mockRM = null;
|
|
|
+ try {
|
|
|
+ mockRM = new MockRM(conf);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
+ cs.updatePlacementRules();
|
|
|
+ mockRM.start();
|
|
|
+ cs.start();
|
|
|
+
|
|
|
+ ApplicationSubmissionContext asc =
|
|
|
+ Records.newRecord(ApplicationSubmissionContext.class);
|
|
|
+ asc.setQueue("default");
|
|
|
+
|
|
|
+ List<PlacementRule> rules =
|
|
|
+ cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
|
|
+
|
|
|
+ UserGroupMappingPlacementRule r =
|
|
|
+ (UserGroupMappingPlacementRule) rules.get(0);
|
|
|
+ ApplicationPlacementContext ctx = r.getPlacementForApp(asc, user);
|
|
|
+ assertEquals("Queue", user, ctx.getQueue());
|
|
|
+
|
|
|
+ if (primary) {
|
|
|
+ assertEquals("Primary Group", user + "group", ctx.getParentQueue());
|
|
|
+ } else {
|
|
|
+ assertEquals("Secondary Group", user + "subgroup1",
|
|
|
+ ctx.getParentQueue());
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (mockRM != null) {
|
|
|
+ mockRM.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- ApplicationSubmissionContext asc =
|
|
|
- Records.newRecord(ApplicationSubmissionContext.class);
|
|
|
- asc.setQueue("default");
|
|
|
+ @Test
|
|
|
+ public void testDynamicPrimaryGroupQueue() throws Exception {
|
|
|
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
|
+ setupQueueConfiguration(conf);
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
|
|
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
|
|
|
|
|
- List<PlacementRule> rules =
|
|
|
- cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
|
|
+ List<String> queuePlacementRules = new ArrayList<>();
|
|
|
+ queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
|
|
+ conf.setQueuePlacementRules(queuePlacementRules);
|
|
|
|
|
|
- UserGroupMappingPlacementRule r =
|
|
|
- (UserGroupMappingPlacementRule) rules.get(0);
|
|
|
- ApplicationPlacementContext ctx = r.getPlacementForApp(asc, user);
|
|
|
- assertEquals("Queue", user, ctx.getQueue());
|
|
|
+ List<UserGroupMappingPlacementRule.QueueMapping> existingMappingsForUG =
|
|
|
+ conf.getQueueMappings();
|
|
|
+
|
|
|
+ // set queue mapping
|
|
|
+ List<UserGroupMappingPlacementRule.QueueMapping> queueMappingsForUG =
|
|
|
+ new ArrayList<>();
|
|
|
+
|
|
|
+ // u:user1:b1
|
|
|
+ UserGroupMappingPlacementRule.QueueMapping userQueueMapping1 =
|
|
|
+ new UserGroupMappingPlacementRule.QueueMapping(
|
|
|
+ UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
|
|
+ "user1", "b1");
|
|
|
+
|
|
|
+ // u:user2:%primary_group
|
|
|
+ UserGroupMappingPlacementRule.QueueMapping userQueueMapping2 =
|
|
|
+ new UserGroupMappingPlacementRule.QueueMapping(
|
|
|
+ UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
|
|
+ "user2", "%primary_group");
|
|
|
+
|
|
|
+ queueMappingsForUG.add(userQueueMapping1);
|
|
|
+ queueMappingsForUG.add(userQueueMapping2);
|
|
|
+ existingMappingsForUG.addAll(queueMappingsForUG);
|
|
|
+ conf.setQueueMappings(existingMappingsForUG);
|
|
|
+
|
|
|
+ // override with queue mappings
|
|
|
+ conf.setOverrideWithQueueMappings(true);
|
|
|
|
|
|
- if (primary) {
|
|
|
- assertEquals("Primary Group", user + "group", ctx.getParentQueue());
|
|
|
- } else {
|
|
|
- assertEquals("Secondary Group", user + "subgroup1", ctx.getParentQueue());
|
|
|
+ MockRM mockRM = null;
|
|
|
+ try {
|
|
|
+ mockRM = new MockRM(conf);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
+ cs.updatePlacementRules();
|
|
|
+ mockRM.start();
|
|
|
+ cs.start();
|
|
|
+
|
|
|
+ ApplicationSubmissionContext asc =
|
|
|
+ Records.newRecord(ApplicationSubmissionContext.class);
|
|
|
+ asc.setQueue("default");
|
|
|
+
|
|
|
+ List<PlacementRule> rules =
|
|
|
+ cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
|
|
+ UserGroupMappingPlacementRule r =
|
|
|
+ (UserGroupMappingPlacementRule) rules.get(0);
|
|
|
+
|
|
|
+ ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
|
|
|
+ assertEquals("Queue", "b1", ctx.getQueue());
|
|
|
+
|
|
|
+ ApplicationPlacementContext ctx1 = r.getPlacementForApp(asc, "user2");
|
|
|
+ assertEquals("Queue", "user2group", ctx1.getQueue());
|
|
|
+ } finally {
|
|
|
+ if (mockRM != null) {
|
|
|
+ mockRM.close();
|
|
|
+ }
|
|
|
}
|
|
|
- mockRM.close();
|
|
|
}
|
|
|
}
|