|
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
|
|
@@ -563,15 +564,37 @@ public class CapacityScheduler extends
|
|
|
}
|
|
|
|
|
|
private void updatePlacementRules() throws IOException {
|
|
|
+ // Initialize placement rules
|
|
|
+ Collection<String> placementRuleStrs = conf.getStringCollection(
|
|
|
+ YarnConfiguration.QUEUE_PLACEMENT_RULES);
|
|
|
List<PlacementRule> placementRules = new ArrayList<>();
|
|
|
-
|
|
|
- // Initialize UserGroupMappingPlacementRule
|
|
|
- // TODO, need make this defineable by configuration.
|
|
|
- UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule();
|
|
|
- if (null != ugRule) {
|
|
|
- placementRules.add(ugRule);
|
|
|
+ if (placementRuleStrs.isEmpty()) {
|
|
|
+ PlacementRule ugRule = getUserGroupMappingPlacementRule();
|
|
|
+ if (null != ugRule) {
|
|
|
+ placementRules.add(ugRule);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ for (String placementRuleStr : placementRuleStrs) {
|
|
|
+ switch (placementRuleStr) {
|
|
|
+ case YarnConfiguration.USER_GROUP_PLACEMENT_RULE:
|
|
|
+ PlacementRule ugRule = getUserGroupMappingPlacementRule();
|
|
|
+ if (null != ugRule) {
|
|
|
+ placementRules.add(ugRule);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ try {
|
|
|
+ PlacementRule rule = PlacementFactory.getPlacementRule(
|
|
|
+ placementRuleStr, conf);
|
|
|
+ if (null != rule) {
|
|
|
+ placementRules.add(rule);
|
|
|
+ }
|
|
|
+ } catch (ClassNotFoundException cnfe) {
|
|
|
+ throw new IOException(cnfe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
rmContext.getQueuePlacementManager().updateRules(placementRules);
|
|
|
}
|
|
|
|