Browse Source

YARN-1392: Add new files

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1542106 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 years ago
parent
commit
6ac4ac05f7

+ 121 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class QueuePlacementPolicy {
+  private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
+  static {
+    Map<String, Class<? extends QueuePlacementRule>> map =
+        new HashMap<String, Class<? extends QueuePlacementRule>>();
+    map.put("user", QueuePlacementRule.User.class);
+    map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
+    map.put("specified", QueuePlacementRule.Specified.class);
+    map.put("default", QueuePlacementRule.Default.class);
+    map.put("reject", QueuePlacementRule.Reject.class);
+    ruleClasses = Collections.unmodifiableMap(map);
+  }
+  
+  private final List<QueuePlacementRule> rules;
+  private final Set<String> configuredQueues;
+  private final Groups groups;
+  
+  public QueuePlacementPolicy(List<QueuePlacementRule> rules,
+      Set<String> configuredQueues, Configuration conf)
+      throws AllocationConfigurationException {
+    for (int i = 0; i < rules.size()-1; i++) {
+      if (rules.get(i).isTerminal()) {
+        throw new AllocationConfigurationException("Rules after rule "
+            + i + " in queue placement policy can never be reached");
+      }
+    }
+    if (!rules.get(rules.size()-1).isTerminal()) {
+      throw new AllocationConfigurationException(
+          "Could get past last queue placement rule without assigning");
+    }
+    this.rules = rules;
+    this.configuredQueues = configuredQueues;
+    groups = new Groups(conf);
+  }
+  
+  /**
+   * Builds a QueuePlacementPolicy from an xml element.
+   */
+  public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQueues,
+      Configuration conf) throws AllocationConfigurationException {
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    NodeList elements = el.getChildNodes();
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (node instanceof Element) {
+        Element element = (Element)node;
+        String ruleName = element.getTagName();
+        Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
+        if (clazz == null) {
+          throw new AllocationConfigurationException("No rule class found for "
+              + ruleName);
+        }
+        QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
+        rule.initializeFromXml(element);
+        rules.add(rule);
+      }
+    }
+    return new QueuePlacementPolicy(rules, configuredQueues, conf);
+  }
+  
+  /**
+   * Applies this rule to an app with the given requested queue and user/group
+   * information.
+   * 
+   * @param requestedQueue
+   *    The queue specified in the ApplicationSubmissionContext
+   * @param user
+   *    The user submitting the app
+   * @return
+   *    The name of the queue to assign the app to.  Or null if the app should
+   *    be rejected.
+   * @throws IOException
+   *    If an exception is encountered while getting the user's groups
+   */
+  public String assignAppToQueue(String requestedQueue, String user)
+      throws IOException {
+    for (QueuePlacementRule rule : rules) {
+      String queue = rule.assignAppToQueue(requestedQueue, user, groups,
+          configuredQueues);
+      if (queue == null || !queue.isEmpty()) {
+        return queue;
+      }
+    }
+    throw new IllegalStateException("Should have applied a rule before " +
+    		"reaching here");
+  }
+}

+ 200 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java

@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+
+public abstract class QueuePlacementRule {
+  protected boolean create;
+  
+  /**
+   * Initializes the rule with any arguments.
+   * 
+   * @param args
+   *    Additional attributes of the rule's xml element other than create.
+   */
+  public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
+    this.create = create;
+    return this;
+  }
+  
+  /**
+   * 
+   * @param requestedQueue
+   *    The queue explicitly requested.
+   * @param user
+   *    The user submitting the app.
+   * @param groups
+   *    The groups of the user submitting the app.
+   * @param configuredQueues
+   *    The queues specified in the scheduler configuration.
+   * @return
+   *    The queue to place the app into. An empty string indicates that we should
+   *    continue to the next rule, and null indicates that the app should be rejected.
+   */
+  public String assignAppToQueue(String requestedQueue, String user,
+      Groups groups, Collection<String> configuredQueues) throws IOException {
+    String queue = getQueueForApp(requestedQueue, user, groups);
+    if (create || configuredQueues.contains(queue)) {
+      return queue;
+    } else {
+      return "";
+    }
+  }
+  
+  public void initializeFromXml(Element el) {
+    boolean create = true;
+    NamedNodeMap attributes = el.getAttributes();
+    Map<String, String> args = new HashMap<String, String>();
+    for (int i = 0; i < attributes.getLength(); i++) {
+      Node node = attributes.item(i);
+      String key = node.getNodeName();
+      String value = node.getNodeValue();
+      if (key.equals("create")) {
+        create = Boolean.parseBoolean(value);
+      } else {
+        args.put(key, value);
+      }
+    }
+    initialize(create, args);
+  }
+  
+  /**
+   * Returns true if this rule never tells the policy to continue.
+   */
+  public abstract boolean isTerminal();
+  
+  /**
+   * Applies this rule to an app with the given requested queue and user/group
+   * information.
+   * 
+   * @param requestedQueue
+   *    The queue specified in the ApplicationSubmissionContext
+   * @param user
+   *    The user submitting the app.
+   * @param groups
+   *    The groups of the user submitting the app.
+   * @return
+   *    The name of the queue to assign the app to, or null to empty string
+   *    continue to the next rule.
+   */
+  protected abstract String getQueueForApp(String requestedQueue, String user,
+      Groups groups) throws IOException;
+
+  /**
+   * Places apps in queues by username of the submitter
+   */
+  public static class User extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue,
+        String user, Groups groups) {
+      return "root." + user;
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+  
+  /**
+   * Places apps in queues by primary group of the submitter
+   */
+  public static class PrimaryGroup extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue,
+        String user, Groups groups) throws IOException {
+      return "root." + groups.getGroups(user).get(0);
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+
+  /**
+   * Places apps in queues by requested queue of the submitter
+   */
+  public static class Specified extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue,
+        String user, Groups groups) {
+      if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
+        return "";
+      } else {
+        if (!requestedQueue.startsWith("root.")) {
+          requestedQueue = "root." + requestedQueue;
+        }
+        return requestedQueue;
+      }
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return false;
+    }
+  }
+  
+  /**
+   * Places all apps in the default queue
+   */
+  public static class Default extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups) {
+      return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+  
+  /**
+   * Rejects all apps
+   */
+  public static class Reject extends QueuePlacementRule {
+    @Override
+    public String assignAppToQueue(String requestedQueue, String user,
+        Groups groups, Collection<String> configuredQueues) {
+      return null;
+    }
+    
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return true;
+    }
+  }
+}

+ 44 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+
+public class SimpleGroupsMapping implements GroupMappingServiceProvider {
+  
+  @Override
+  public List<String> getGroups(String user) {
+    return Arrays.asList(user + "group");
+  }
+
+  @Override
+  public void cacheGroupsRefresh() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void cacheGroupsAdd(List<String> groups) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+}

+ 119 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java

@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.collect.Sets;
+
+public class TestQueuePlacementPolicy {
+  private final static Configuration conf = new Configuration();
+  private final static Set<String> configuredQueues = Sets.newHashSet("root.someuser");
+  
+  @BeforeClass
+  public static void setup() {
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+  }
+  
+  @Test
+  public void testSpecifiedUserPolicy() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <user />");
+    sb.append("</queuePlacementPolicy>");
+    QueuePlacementPolicy policy = parse(sb.toString());
+    assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq", "someuser"));
+    assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
+    assertEquals("root.otheruser", policy.assignAppToQueue("default", "otheruser"));
+  }
+  
+  @Test
+  public void testNoCreate() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <user create=\"false\" />");
+    sb.append("  <default />");
+    sb.append("</queuePlacementPolicy>");
+    QueuePlacementPolicy policy = parse(sb.toString());
+    assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
+    assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
+    assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "otheruser"));
+    assertEquals("root.default", policy.assignAppToQueue("default", "otheruser"));
+  }
+  
+  @Test
+  public void testSpecifiedThenReject() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <reject />");
+    sb.append("</queuePlacementPolicy>");
+    QueuePlacementPolicy policy = parse(sb.toString());
+    assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
+    assertEquals(null, policy.assignAppToQueue("default", "someuser"));
+  }
+  
+  @Test (expected = AllocationConfigurationException.class)
+  public void testOmittedTerminalRule() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <user create=\"false\" />");
+    sb.append("</queuePlacementPolicy>");
+    parse(sb.toString());
+  }
+  
+  @Test (expected = AllocationConfigurationException.class)
+  public void testTerminalRuleInMiddle() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <default />");
+    sb.append("  <user />");
+    sb.append("</queuePlacementPolicy>");
+    parse(sb.toString());
+  }
+  
+  private QueuePlacementPolicy parse(String str) throws Exception {
+    // Read and parse the allocations file.
+    DocumentBuilderFactory docBuilderFactory =
+      DocumentBuilderFactory.newInstance();
+    docBuilderFactory.setIgnoringComments(true);
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document doc = builder.parse(IOUtils.toInputStream(str));
+    Element root = doc.getDocumentElement();
+    return QueuePlacementPolicy.fromXml(root, configuredQueues, conf);
+  }
+}