Browse Source

YARN-1392. Allow sophisticated app-to-queue placement policies in the Fair Scheduler (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1542108 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 years ago
parent
commit
516e657ac7
9 changed files with 666 additions and 33 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 16 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  3. 48 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
  4. 121 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
  5. 200 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
  6. 44 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
  7. 45 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
  8. 119 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
  9. 70 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -22,6 +22,9 @@ Release 2.3.0 - UNRELEASED
     YARN-311. RM/scheduler support for dynamic resource configuration.
     YARN-311. RM/scheduler support for dynamic resource configuration.
     (Junping Du via llu)
     (Junping Du via llu)
 
 
+    YARN-1392. Allow sophisticated app-to-queue placement policies in the Fair
+    Scheduler (Sandy Ryza)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

+ 16 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -136,9 +136,6 @@ public class FairScheduler implements ResourceScheduler {
   // How often fair shares are re-calculated (ms)
   // How often fair shares are re-calculated (ms)
   protected long UPDATE_INTERVAL = 500;
   protected long UPDATE_INTERVAL = 500;
 
 
-  // Whether to use username in place of "default" queue name
-  private volatile boolean userAsDefaultQueue = false;
-
   private final static List<Container> EMPTY_CONTAINER_LIST =
   private final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
       new ArrayList<Container>();
 
 
@@ -640,6 +637,12 @@ public class FairScheduler implements ResourceScheduler {
     RMApp rmApp = rmContext.getRMApps().get(
     RMApp rmApp = rmContext.getRMApps().get(
         applicationAttemptId.getApplicationId());
         applicationAttemptId.getApplicationId());
     FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
     FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+    if (queue == null) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRejectedEvent(applicationAttemptId,
+              "Application rejected by queue placement policy"));
+      return;
+    }
 
 
     FSSchedulerApp schedulerApp =
     FSSchedulerApp schedulerApp =
         new FSSchedulerApp(applicationAttemptId, user,
         new FSSchedulerApp(applicationAttemptId, user,
@@ -675,17 +678,16 @@ public class FairScheduler implements ResourceScheduler {
   
   
   @VisibleForTesting
   @VisibleForTesting
   FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
   FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
-    // Potentially set queue to username if configured to do so
-    if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
-        userAsDefaultQueue) {
-      queueName = user;
-    }
-    
-    FSLeafQueue queue = queueMgr.getLeafQueue(queueName,
-        conf.getAllowUndeclaredPools());
-    if (queue == null) {
-      // queue is not an existing or createable leaf queue
-      queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, false);
+    FSLeafQueue queue = null;
+    try {
+      QueuePlacementPolicy policy = queueMgr.getPlacementPolicy();
+      queueName = policy.assignAppToQueue(queueName, user);
+      if (queueName == null) {
+        return null;
+      }
+      queue = queueMgr.getLeafQueue(queueName, true);
+    } catch (IOException ex) {
+      LOG.error("Error assigning app to queue, rejecting", ex);
     }
     }
     
     
     if (rmApp != null) {
     if (rmApp != null) {
@@ -1155,7 +1157,6 @@ public class FairScheduler implements ResourceScheduler {
     minimumAllocation = this.conf.getMinimumAllocation();
     minimumAllocation = this.conf.getMinimumAllocation();
     maximumAllocation = this.conf.getMaximumAllocation();
     maximumAllocation = this.conf.getMaximumAllocation();
     incrAllocation = this.conf.getIncrementAllocation();
     incrAllocation = this.conf.getIncrementAllocation();
-    userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
     continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
     continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
     continuousSchedulingSleepMs =
     continuousSchedulingSleepMs =
             this.conf.getContinuousSchedulingSleepMs();
             this.conf.getContinuousSchedulingSleepMs();

+ 48 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java

@@ -25,6 +25,7 @@ import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -51,6 +52,8 @@ import org.w3c.dom.NodeList;
 import org.w3c.dom.Text;
 import org.w3c.dom.Text;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXException;
 
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
 /**
  * Maintains a list of queues as well as scheduling parameters for each queue,
  * Maintains a list of queues as well as scheduling parameters for each queue,
  * such as guaranteed share allocations, from the fair scheduler config file.
  * such as guaranteed share allocations, from the fair scheduler config file.
@@ -87,6 +90,8 @@ public class QueueManager {
   private FSParentQueue rootQueue;
   private FSParentQueue rootQueue;
 
 
   private volatile QueueManagerInfo info = new QueueManagerInfo();
   private volatile QueueManagerInfo info = new QueueManagerInfo();
+  @VisibleForTesting
+  volatile QueuePlacementPolicy placementPolicy;
   
   
   private long lastReloadAttempt; // Last time we tried to reload the queues file
   private long lastReloadAttempt; // Last time we tried to reload the queues file
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -107,6 +112,8 @@ public class QueueManager {
     queues.put(rootQueue.getName(), rootQueue);
     queues.put(rootQueue.getName(), rootQueue);
     
     
     this.allocFile = conf.getAllocationFile();
     this.allocFile = conf.getAllocationFile();
+    placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
+        new HashSet<String>(), conf);
     
     
     reloadAllocs();
     reloadAllocs();
     lastSuccessfulReload = scheduler.getClock().getTime();
     lastSuccessfulReload = scheduler.getClock().getTime();
@@ -115,6 +122,28 @@ public class QueueManager {
     getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
     getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
   }
   }
   
   
+  public void updatePlacementPolicy(FairSchedulerConfiguration conf) {
+    
+  }
+  
+  /**
+   * Construct simple queue placement policy from allow-undeclared-pools and
+   * user-as-default-queue.
+   */
+  private List<QueuePlacementRule> getSimplePlacementRules() {
+    boolean create = scheduler.getConf().getAllowUndeclaredPools();
+    boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue();
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+    if (userAsDefaultQueue) {
+      rules.add(new QueuePlacementRule.User().initialize(create, null));
+    }
+    if (!userAsDefaultQueue || !create) {
+      rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    }
+    return rules;
+  }
+  
   /**
   /**
    * Get a queue by name, creating it if the create param is true and is necessary.
    * Get a queue by name, creating it if the create param is true and is necessary.
    * If the queue is not or can not be a leaf queue, i.e. it already exists as a
    * If the queue is not or can not be a leaf queue, i.e. it already exists as a
@@ -226,6 +255,10 @@ public class QueueManager {
       return queues.containsKey(name);
       return queues.containsKey(name);
     }
     }
   }
   }
+  
+  public QueuePlacementPolicy getPlacementPolicy() {
+    return placementPolicy;
+  }
 
 
   /**
   /**
    * Reload allocations file if it hasn't been loaded in a while
    * Reload allocations file if it hasn't been loaded in a while
@@ -290,6 +323,8 @@ public class QueueManager {
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
+    
+    QueuePlacementPolicy newPlacementPolicy = null;
 
 
     // Remember all queue names so we can display them on web UI, etc.
     // Remember all queue names so we can display them on web UI, etc.
     List<String> queueNamesInAllocFile = new ArrayList<String>();
     List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -306,6 +341,7 @@ public class QueueManager {
           "file: top-level element not <allocations>");
           "file: top-level element not <allocations>");
     NodeList elements = root.getChildNodes();
     NodeList elements = root.getChildNodes();
     List<Element> queueElements = new ArrayList<Element>();
     List<Element> queueElements = new ArrayList<Element>();
+    Element placementPolicyElement = null;
     for (int i = 0; i < elements.getLength(); i++) {
     for (int i = 0; i < elements.getLength(); i++) {
       Node node = elements.item(i);
       Node node = elements.item(i);
       if (node instanceof Element) {
       if (node instanceof Element) {
@@ -348,6 +384,8 @@ public class QueueManager {
           String text = ((Text)element.getFirstChild()).getData().trim();
           String text = ((Text)element.getFirstChild()).getData().trim();
           SchedulingPolicy.setDefault(text);
           SchedulingPolicy.setDefault(text);
           defaultSchedPolicy = SchedulingPolicy.getDefault();
           defaultSchedPolicy = SchedulingPolicy.getDefault();
+        } else if ("queuePlacementPolicy".equals(element.getTagName())) {
+          placementPolicyElement = element;
         } else {
         } else {
           LOG.warn("Bad element in allocations file: " + element.getTagName());
           LOG.warn("Bad element in allocations file: " + element.getTagName());
         }
         }
@@ -369,6 +407,15 @@ public class QueueManager {
           userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
           userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
           queueAcls, queueNamesInAllocFile);
           queueAcls, queueNamesInAllocFile);
     }
     }
+    
+    // Load placement policy and pass it configured queues
+    if (placementPolicyElement != null) {
+      newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
+          new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
+    } else {
+      newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
+          new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
+    }
 
 
     // Commit the reload; also create any queue defined in the alloc file
     // Commit the reload; also create any queue defined in the alloc file
     // if it does not already exist, so it can be displayed on the web UI.
     // if it does not already exist, so it can be displayed on the web UI.
@@ -377,6 +424,7 @@ public class QueueManager {
           queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
           queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
           queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
           queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
           queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
           queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
+      placementPolicy = newPlacementPolicy;
       
       
       // Make sure all queues exist
       // Make sure all queues exist
       for (String name: queueNamesInAllocFile) {
       for (String name: queueNamesInAllocFile) {

+ 121 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class QueuePlacementPolicy {
+  private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
+  static {
+    Map<String, Class<? extends QueuePlacementRule>> map =
+        new HashMap<String, Class<? extends QueuePlacementRule>>();
+    map.put("user", QueuePlacementRule.User.class);
+    map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
+    map.put("specified", QueuePlacementRule.Specified.class);
+    map.put("default", QueuePlacementRule.Default.class);
+    map.put("reject", QueuePlacementRule.Reject.class);
+    ruleClasses = Collections.unmodifiableMap(map);
+  }
+  
+  private final List<QueuePlacementRule> rules;
+  private final Set<String> configuredQueues;
+  private final Groups groups;
+  
+  public QueuePlacementPolicy(List<QueuePlacementRule> rules,
+      Set<String> configuredQueues, Configuration conf)
+      throws AllocationConfigurationException {
+    for (int i = 0; i < rules.size()-1; i++) {
+      if (rules.get(i).isTerminal()) {
+        throw new AllocationConfigurationException("Rules after rule "
+            + i + " in queue placement policy can never be reached");
+      }
+    }
+    if (!rules.get(rules.size()-1).isTerminal()) {
+      throw new AllocationConfigurationException(
+          "Could get past last queue placement rule without assigning");
+    }
+    this.rules = rules;
+    this.configuredQueues = configuredQueues;
+    groups = new Groups(conf);
+  }
+  
+  /**
+   * Builds a QueuePlacementPolicy from an xml element.
+   */
+  public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQueues,
+      Configuration conf) throws AllocationConfigurationException {
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    NodeList elements = el.getChildNodes();
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (node instanceof Element) {
+        Element element = (Element)node;
+        String ruleName = element.getTagName();
+        Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
+        if (clazz == null) {
+          throw new AllocationConfigurationException("No rule class found for "
+              + ruleName);
+        }
+        QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
+        rule.initializeFromXml(element);
+        rules.add(rule);
+      }
+    }
+    return new QueuePlacementPolicy(rules, configuredQueues, conf);
+  }
+  
+  /**
+   * Applies this rule to an app with the given requested queue and user/group
+   * information.
+   * 
+   * @param requestedQueue
+   *    The queue specified in the ApplicationSubmissionContext
+   * @param user
+   *    The user submitting the app
+   * @return
+   *    The name of the queue to assign the app to.  Or null if the app should
+   *    be rejected.
+   * @throws IOException
+   *    If an exception is encountered while getting the user's groups
+   */
+  public String assignAppToQueue(String requestedQueue, String user)
+      throws IOException {
+    for (QueuePlacementRule rule : rules) {
+      String queue = rule.assignAppToQueue(requestedQueue, user, groups,
+          configuredQueues);
+      if (queue == null || !queue.isEmpty()) {
+        return queue;
+      }
+    }
+    throw new IllegalStateException("Should have applied a rule before " +
+    		"reaching here");
+  }
+}

+ 200 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java

@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+
+public abstract class QueuePlacementRule {
+  protected boolean create;
+  
+  /**
+   * Initializes the rule with any arguments.
+   * 
+   * @param args
+   *    Additional attributes of the rule's xml element other than create.
+   */
+  public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
+    this.create = create;
+    return this;
+  }
+  
+  /**
+   * 
+   * @param requestedQueue
+   *    The queue explicitly requested.
+   * @param user
+   *    The user submitting the app.
+   * @param groups
+   *    The groups of the user submitting the app.
+   * @param configuredQueues
+   *    The queues specified in the scheduler configuration.
+   * @return
+   *    The queue to place the app into. An empty string indicates that we should
+   *    continue to the next rule, and null indicates that the app should be rejected.
+   */
+  public String assignAppToQueue(String requestedQueue, String user,
+      Groups groups, Collection<String> configuredQueues) throws IOException {
+    String queue = getQueueForApp(requestedQueue, user, groups);
+    if (create || configuredQueues.contains(queue)) {
+      return queue;
+    } else {
+      return "";
+    }
+  }
+  
+  public void initializeFromXml(Element el) {
+    boolean create = true;
+    NamedNodeMap attributes = el.getAttributes();
+    Map<String, String> args = new HashMap<String, String>();
+    for (int i = 0; i < attributes.getLength(); i++) {
+      Node node = attributes.item(i);
+      String key = node.getNodeName();
+      String value = node.getNodeValue();
+      if (key.equals("create")) {
+        create = Boolean.parseBoolean(value);
+      } else {
+        args.put(key, value);
+      }
+    }
+    initialize(create, args);
+  }
+  
+  /**
+   * Returns true if this rule never tells the policy to continue.
+   */
+  public abstract boolean isTerminal();
+  
+  /**
+   * Applies this rule to an app with the given requested queue and user/group
+   * information.
+   * 
+   * @param requestedQueue
+   *    The queue specified in the ApplicationSubmissionContext
+   * @param user
+   *    The user submitting the app.
+   * @param groups
+   *    The groups of the user submitting the app.
+   * @return
+   *    The name of the queue to assign the app to, or null to empty string
+   *    continue to the next rule.
+   */
+  protected abstract String getQueueForApp(String requestedQueue, String user,
+      Groups groups) throws IOException;
+
+  /**
+   * Places apps in queues by username of the submitter
+   */
+  public static class User extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue,
+        String user, Groups groups) {
+      return "root." + user;
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+  
+  /**
+   * Places apps in queues by primary group of the submitter
+   */
+  public static class PrimaryGroup extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue,
+        String user, Groups groups) throws IOException {
+      return "root." + groups.getGroups(user).get(0);
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+
+  /**
+   * Places apps in queues by requested queue of the submitter
+   */
+  public static class Specified extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue,
+        String user, Groups groups) {
+      if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
+        return "";
+      } else {
+        if (!requestedQueue.startsWith("root.")) {
+          requestedQueue = "root." + requestedQueue;
+        }
+        return requestedQueue;
+      }
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return false;
+    }
+  }
+  
+  /**
+   * Places all apps in the default queue
+   */
+  public static class Default extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups) {
+      return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+  
+  /**
+   * Rejects all apps
+   */
+  public static class Reject extends QueuePlacementRule {
+    @Override
+    public String assignAppToQueue(String requestedQueue, String user,
+        Groups groups, Collection<String> configuredQueues) {
+      return null;
+    }
+    
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return true;
+    }
+  }
+}

+ 44 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+
+public class SimpleGroupsMapping implements GroupMappingServiceProvider {
+  
+  @Override
+  public List<String> getGroups(String user) {
+    return Arrays.asList(user + "group");
+  }
+
+  @Override
+  public void cacheGroupsRefresh() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void cacheGroupsAdd(List<String> groups) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+}

+ 45 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -44,7 +44,9 @@ import javax.xml.parsers.ParserConfigurationException;
 import junit.framework.Assert;
 import junit.framework.Assert;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -94,6 +96,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXException;
 
 
+import com.google.common.collect.Sets;
+
 public class TestFairScheduler {
 public class TestFairScheduler {
 
 
   private class MockClock implements Clock {
   private class MockClock implements Clock {
@@ -616,6 +620,7 @@ public class TestFairScheduler {
 
 
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     scheduler.reinitialize(conf, resourceManager.getRMContext());
+    scheduler.getQueueManager().initialize();
     AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
     AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
         createAppAttemptId(2, 1), "default", "user2");
         createAppAttemptId(2, 1), "default", "user2");
     scheduler.handle(appAddedEvent2);
     scheduler.handle(appAddedEvent2);
@@ -664,6 +669,46 @@ public class TestFairScheduler {
     assertEquals(rmApp2.getQueue(), queue2.getName());
     assertEquals(rmApp2.getQueue(), queue2.getName());
     assertEquals("root.notdefault", rmApp2.getQueue());
     assertEquals("root.notdefault", rmApp2.getQueue());
   }
   }
+  
+  @Test
+  public void testQueuePlacementWithPolicy() throws Exception {
+    Configuration conf = createConfiguration();
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+    ApplicationAttemptId appId;
+    Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
+
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.Specified().initialize(true, null));
+    rules.add(new QueuePlacementRule.User().initialize(false, null));
+    rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
+    rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    Set<String> queues = Sets.newHashSet("root.user1", "root.user3group");
+    scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
+        rules, queues, conf);
+    appId = createSchedulingRequest(1024, "somequeue", "user1");
+    assertEquals("root.somequeue", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "default", "user1");
+    assertEquals("root.user1", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "default", "user3");
+    assertEquals("root.user3group", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "default", "otheruser");
+    assertEquals("root.default", apps.get(appId).getQueueName());
+    
+    // test without specified as first rule
+    rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.User().initialize(false, null));
+    rules.add(new QueuePlacementRule.Specified().initialize(true, null));
+    rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
+        rules, queues, conf);
+    appId = createSchedulingRequest(1024, "somequeue", "user1");
+    assertEquals("root.user1", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "somequeue", "otheruser");
+    assertEquals("root.somequeue", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "default", "otheruser");
+    assertEquals("root.default", apps.get(appId).getQueueName());
+  }
 
 
   @Test
   @Test
   public void testFairShareWithMinAlloc() throws Exception {
   public void testFairShareWithMinAlloc() throws Exception {

+ 119 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java

@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.collect.Sets;
+
+public class TestQueuePlacementPolicy {
+  private final static Configuration conf = new Configuration();
+  private final static Set<String> configuredQueues = Sets.newHashSet("root.someuser");
+  
+  @BeforeClass
+  public static void setup() {
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+  }
+  
+  @Test
+  public void testSpecifiedUserPolicy() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <user />");
+    sb.append("</queuePlacementPolicy>");
+    QueuePlacementPolicy policy = parse(sb.toString());
+    assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq", "someuser"));
+    assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
+    assertEquals("root.otheruser", policy.assignAppToQueue("default", "otheruser"));
+  }
+  
+  @Test
+  public void testNoCreate() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <user create=\"false\" />");
+    sb.append("  <default />");
+    sb.append("</queuePlacementPolicy>");
+    QueuePlacementPolicy policy = parse(sb.toString());
+    assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
+    assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
+    assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "otheruser"));
+    assertEquals("root.default", policy.assignAppToQueue("default", "otheruser"));
+  }
+  
+  @Test
+  public void testSpecifiedThenReject() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <reject />");
+    sb.append("</queuePlacementPolicy>");
+    QueuePlacementPolicy policy = parse(sb.toString());
+    assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
+    assertEquals(null, policy.assignAppToQueue("default", "someuser"));
+  }
+  
+  @Test (expected = AllocationConfigurationException.class)
+  public void testOmittedTerminalRule() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <user create=\"false\" />");
+    sb.append("</queuePlacementPolicy>");
+    parse(sb.toString());
+  }
+  
+  @Test (expected = AllocationConfigurationException.class)
+  public void testTerminalRuleInMiddle() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <specified />");
+    sb.append("  <default />");
+    sb.append("  <user />");
+    sb.append("</queuePlacementPolicy>");
+    parse(sb.toString());
+  }
+  
+  private QueuePlacementPolicy parse(String str) throws Exception {
+    // Read and parse the allocations file.
+    DocumentBuilderFactory docBuilderFactory =
+      DocumentBuilderFactory.newInstance();
+    docBuilderFactory.setIgnoringComments(true);
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document doc = builder.parse(IOUtils.toInputStream(str));
+    Element root = doc.getDocumentElement();
+    return QueuePlacementPolicy.fromXml(root, configuredQueues, conf);
+  }
+}

+ 70 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

@@ -101,6 +101,16 @@ Hadoop MapReduce Next Generation - Fair Scheduler
   Fair Scheduler. Among them, is the use of a custom policies governing 
   Fair Scheduler. Among them, is the use of a custom policies governing 
   priority “boosting” over  certain apps. 
   priority “boosting” over  certain apps. 
 
 
+* {Automatically placing applications in queues}
+
+  The Fair Scheduler allows administrators to configure policies that
+  automatically place submitted applications into appropriate queues. Placement
+  can depend on the user and groups of the submitter and the requested queue
+  passed by the application. A policy consists of a set of rules that are applied
+  sequentially to classify an incoming application. Each rule either places the
+  app into a queue, rejects it, or continues on to the next rule. Refer to the
+  allocation file format below for how to configure these policies.
+
 * {Installation}
 * {Installation}
 
 
   To use the Fair Scheduler first assign the appropriate scheduler class in 
   To use the Fair Scheduler first assign the appropriate scheduler class in 
@@ -138,7 +148,8 @@ Properties that can be placed in yarn-site.xml
     * Whether to use the username associated with the allocation as the default 
     * Whether to use the username associated with the allocation as the default 
       queue name, in the event that a queue name is not specified. If this is set 
       queue name, in the event that a queue name is not specified. If this is set 
       to "false" or unset, all jobs have a shared default queue, named "default".
       to "false" or unset, all jobs have a shared default queue, named "default".
-      Defaults to true.
+      Defaults to true.  If a queue placement policy is given in the allocations
+      file, this property is ignored.
 
 
  * <<<yarn.scheduler.fair.preemption>>>
  * <<<yarn.scheduler.fair.preemption>>>
 
 
@@ -180,6 +191,16 @@ Properties that can be placed in yarn-site.xml
       opportunities to pass up. The default value of -1.0 means don't pass up any
       opportunities to pass up. The default value of -1.0 means don't pass up any
       scheduling opportunities.
       scheduling opportunities.
 
 
+ * <<<yarn.scheduler.fair.allow-undeclared-pools>>>
+
+    * If this is true, new queues can be created at application submission time,
+      whether because they are specified as the application's queue by the
+      submitter or because they are placed there by the user-as-default-queue
+      property. If this is false, any time an app would be placed in a queue that
+      is not specified in the allocations file, it is placed in the "default" queue
+      instead. Defaults to true. If a queue placement policy is given in the
+      allocations file, this property is ignored.
+
 Allocation file format
 Allocation file format
 
 
   The allocation file must be in XML format. The format contains five types of
   The allocation file must be in XML format. The format contains five types of
@@ -248,25 +269,29 @@ Allocation file format
    policy for queues; overriden by the schedulingPolicy element in each queue
    policy for queues; overriden by the schedulingPolicy element in each queue
    if specified. Defaults to "fair".
    if specified. Defaults to "fair".
 
 
-  An example allocation file is given here:
+ * <<A queuePlacementPolicy element>>, which contains a list of rule elements
+   that tell the scheduler how to place incoming apps into queues. Rules
+   are applied in the order that they are listed. Rules may take arguments. All
+   rules accept the "create" argument, which indicates whether the rule can create
+   a new queue. "Create" defaults to true; if set to false and the rule would
+   place the app in a queue that is not configured in the allocations file, we
+   continue on to the next rule. The last rule must be one that can never issue a
+   continue.  Valid rules are:
 
 
-Queue Access Control Lists (ACLs)
+     * specified: the app is placed into the queue it requested.  If the app
+       requested no queue, i.e. it specified "default", we continue.
 
 
-  Queue Access Control Lists (ACLs) allow administrators to control who may
-  take actions on particular queues. They are configured with the aclSubmitApps
-  and aclAdministerApps properties, which can be set per queue. Currently the
-  only supported administrative action is killing an application. Anybody who
-  may administer a queue may also submit applications to it. These properties
-  take values in a format like "user1,user2 group1,group2" or " group1,group2".
-  An action on a queue will be permitted if its user or group is in the ACL of
-  that queue or in the ACL of any of that queue's ancestors. So if queue2
-  is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
-  ACL, then both users may submit to queue2.
-  
-  The root queue's ACLs are "*" by default which, because ACLs are passed down,
-  means that everybody may submit to and kill applications from every queue.
-  To start restricting access, change the root queue's ACLs to something other
-  than "*". 
+     * user: the app is placed into a queue with the name of the user who
+       submitted it.
+
+     * primaryGroup: the app is placed into a queue with the name of the
+       primary group of the user who submitted it.
+
+     * default: the app is placed into the queue named "default".
+
+     * reject: the app is rejected.
+
+  An example allocation file is given here:
 
 
 ---
 ---
 <?xml version="1.0"?>
 <?xml version="1.0"?>
@@ -282,14 +307,41 @@ Queue Access Control Lists (ACLs)
       <minResources>5000 mb,0vcores</minResources>
       <minResources>5000 mb,0vcores</minResources>
     </queue>
     </queue>
   </queue>
   </queue>
+  
   <user name="sample_user">
   <user name="sample_user">
     <maxRunningApps>30</maxRunningApps>
     <maxRunningApps>30</maxRunningApps>
   </user>
   </user>
   <userMaxAppsDefault>5</userMaxAppsDefault>
   <userMaxAppsDefault>5</userMaxAppsDefault>
+  
+  <queuePlacementPolicy>
+    <specified />
+    <primarygroup create="false" />
+    <default />
+  </queuePlacementPolicy>
 </allocations>
 </allocations>
 ---
 ---
 
 
   Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.
   Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.
+
+
+Queue Access Control Lists (ACLs)
+
+  Queue Access Control Lists (ACLs) allow administrators to control who may
+  take actions on particular queues. They are configured with the aclSubmitApps
+  and aclAdministerApps properties, which can be set per queue. Currently the
+  only supported administrative action is killing an application. Anybody who
+  may administer a queue may also submit applications to it. These properties
+  take values in a format like "user1,user2 group1,group2" or " group1,group2".
+  An action on a queue will be permitted if its user or group is in the ACL of
+  that queue or in the ACL of any of that queue's ancestors. So if queue2
+  is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
+  ACL, then both users may submit to queue2.
+  
+  The root queue's ACLs are "*" by default which, because ACLs are passed down,
+  means that everybody may submit to and kill applications from every queue.
+  To start restricting access, change the root queue's ACLs to something other
+  than "*". 
+
   
   
 * {Administration}
 * {Administration}