Browse Source

YARN-1707. Introduce APIs to add/remove/resize queues in the CapacityScheduler. Contributed by Carlo Curino and Subru Krishnan

carlo curino 10 years ago
parent
commit
aac47fda7f
13 changed files with 1012 additions and 28 deletions
  1. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  2. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java
  3. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
  4. 190 19
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  5. 22 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  6. 6 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  7. 193 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
  8. 98 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
  9. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java
  10. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java
  11. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  12. 282 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
  13. 103 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -220,6 +221,24 @@ public abstract class AbstractYarnScheduler
         + " does not support moving apps between queues");
   }
 
+  public void removeQueue(String queueName) throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support removing queues");
+  }
+
+  @Override
+  public void addQueue(Queue newQueue) throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support this operation");
+  }
+
+  @Override
+  public void setEntitlement(String queue, QueueEntitlement entitlement)
+      throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support this operation");
+  }
+
   private void killOrphanContainerOnNode(RMNode node,
       NMContainerStatus container) {
     if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
@@ -503,4 +522,10 @@ public abstract class AbstractYarnScheduler
   public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
     return EnumSet.of(SchedulerResourceTypes.MEMORY);
   }
+
+  @Override
+  public Set<String> getPlanQueues() throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support reservations");
+  }
 }

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java

@@ -0,0 +1,13 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class SchedulerDynamicEditException extends YarnException {
+
+  private static final long serialVersionUID = 7100374511387193257L;
+
+  public SchedulerDynamicEditException(String string) {
+    super(string);
+  }
+
+}

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 
@@ -223,6 +225,46 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
    */
   void killAllAppsInQueue(String queueName) throws YarnException;
 
+  /**
+   * Remove an existing queue. Implementations might limit when a queue could be
+   * removed (e.g., must have zero entitlement, and no applications running, or
+   * must be a leaf, etc..).
+   *
+   * @param queueName name of the queue to remove
+   * @throws YarnException
+   */
+  void removeQueue(String queueName) throws YarnException;
+
+  /**
+   * Add to the scheduler a new Queue. Implementations might limit what type of
+   * queues can be dynamically added (e.g., Queue must be a leaf, must be
+   * attached to existing parent, must have zero entitlement).
+   *
+   * @param newQueue the queue being added.
+   * @throws YarnException
+   */
+  void addQueue(Queue newQueue) throws YarnException;
+
+  /**
+   * This method increase the entitlement for current queue (must respect
+   * invariants, e.g., no overcommit of parents, non negative, etc.).
+   * Entitlement is a general term for weights in FairScheduler, capacity for
+   * the CapacityScheduler, etc.
+   *
+   * @param queue the queue for which we change entitlement
+   * @param entitlement the new entitlement for the queue (capacity,
+   *              maxCapacity, etc..)
+   * @throws YarnException
+   */
+  void setEntitlement(String queue, QueueEntitlement entitlement)
+      throws YarnException;
+
+  /**
+   * Gets the list of names for queues managed by the Reservation System
+   * @return the list of queues which support reservations
+   */
+  public Set<String> getPlanQueues() throws YarnException;  
+
   /**
    * Return a collection of the resource types that are considered when
    * scheduling

+ 190 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -24,6 +24,8 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -90,6 +92,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
@@ -473,9 +480,12 @@ public class CapacityScheduler extends
   private void validateExistingQueues(
       Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) 
   throws IOException {
-    for (String queue : queues.keySet()) {
-      if (!newQueues.containsKey(queue)) {
-        throw new IOException(queue + " cannot be found during refresh!");
+    // check that all static queues are included in the newQueues list
+    for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
+      if (!(e.getValue() instanceof ReservationQueue)) {
+        if (!newQueues.containsKey(e.getKey())) {
+          throw new IOException(e.getKey() + " cannot be found during refresh!");
+        }
       }
     }
   }
@@ -507,26 +517,42 @@ public class CapacityScheduler extends
       Map<String, CSQueue> oldQueues, 
       QueueHook hook) throws IOException {
     CSQueue queue;
+    String fullQueueName =
+        (parent == null) ? queueName
+            : (parent.getQueuePath() + "." + queueName);
     String[] childQueueNames = 
-      conf.getQueues((parent == null) ? 
-          queueName : (parent.getQueuePath()+"."+queueName));
+      conf.getQueues(fullQueueName);
+    boolean isReservableQueue = conf.isReservableQueue(fullQueueName);
     if (childQueueNames == null || childQueueNames.length == 0) {
       if (null == parent) {
         throw new IllegalStateException(
             "Queue configuration missing child queue names for " + queueName);
       }
-      queue = 
-          new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
-      
-      // Used only for unit tests
-      queue = hook.hook(queue);
+      // Check if the queue will be dynamically managed by the Reservation
+      // system
+     if (isReservableQueue) {
+        queue =
+            new PlanQueue(csContext, queueName, parent,
+                oldQueues.get(queueName));
+      } else {
+        queue =
+            new LeafQueue(csContext, queueName, parent,
+                oldQueues.get(queueName));
+
+        // Used only for unit tests
+        queue = hook.hook(queue);
+      }
     } else {
+      if (isReservableQueue) {
+        throw new IllegalStateException(
+            "Only Leaf Queues can be reservable for " + queueName);
+      }
       ParentQueue parentQueue = 
         new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName));
 
       // Used only for unit tests
       queue = hook.hook(parentQueue);
-      
+
       List<CSQueue> childQueues = new ArrayList<CSQueue>();
       for (String childQueueName : childQueueNames) {
         CSQueue childQueue = 
@@ -548,7 +574,7 @@ public class CapacityScheduler extends
     return queue;
   }
 
-  synchronized CSQueue getQueue(String queueName) {
+  public synchronized CSQueue getQueue(String queueName) {
     if (queueName == null) {
       return null;
     }
@@ -716,7 +742,7 @@ public class CapacityScheduler extends
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
     LOG.info("Application Attempt " + applicationAttemptId + " is done." +
-    		" finalState=" + rmAppAttemptFinalState);
+        " finalState=" + rmAppAttemptFinalState);
     
     FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
     SchedulerApplication<FiCaSchedulerApp> application =
@@ -995,9 +1021,16 @@ public class CapacityScheduler extends
     case APP_ADDED:
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(),
-        appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
+      String queueName =
+          resolveReservationQueueName(appAddedEvent.getQueue(),
+              appAddedEvent.getApplicationId(),
+              appAddedEvent.getReservationID());
+      if (queueName != null) {
+        addApplication(appAddedEvent.getApplicationId(),
+            queueName,
+            appAddedEvent.getUser(),
+            appAddedEvent.getIsAppRecovering());
+      }
     }
     break;
     case APP_REMOVED:
@@ -1230,6 +1263,123 @@ public class CapacityScheduler extends
     }
   }
 
+  private synchronized String resolveReservationQueueName(String queueName,
+      ApplicationId applicationId, ReservationId reservationID) {
+    CSQueue queue = getQueue(queueName);
+    // Check if the queue is a plan queue
+    if ((queue == null) || !(queue instanceof PlanQueue)) {
+      return queueName;
+    }
+    if (reservationID != null) {
+      String resQName = reservationID.toString();
+      queue = getQueue(resQName);
+      if (queue == null) {
+        String message =
+            "Application "
+                + applicationId
+                + " submitted to a reservation which is not yet currently active: "
+                + resQName;
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppRejectedEvent(applicationId, message));
+        return null;
+      }
+      // use the reservation queue to run the app
+      queueName = resQName;
+    } else {
+      // use the default child queue of the plan for unreserved apps
+      queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    }
+    return queueName;
+  }
+
+  @Override
+  public synchronized void removeQueue(String queueName)
+      throws SchedulerDynamicEditException {
+    LOG.info("Removing queue: " + queueName);
+    CSQueue q = this.getQueue(queueName);
+    if (!(q instanceof ReservationQueue)) {
+      throw new SchedulerDynamicEditException("The queue that we are asked "
+          + "to remove (" + queueName + ") is not a ReservationQueue");
+    }
+    ReservationQueue disposableLeafQueue = (ReservationQueue) q;
+    // at this point we should have no more apps
+    if (disposableLeafQueue.getNumApplications() > 0) {
+      throw new SchedulerDynamicEditException("The queue " + queueName
+          + " is not empty " + disposableLeafQueue.getApplications().size()
+          + " active apps " + disposableLeafQueue.pendingApplications.size()
+          + " pending apps");
+    }
+
+    ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
+    this.queues.remove(queueName);
+    LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
+  }
+
+  @Override
+  public synchronized void addQueue(Queue queue)
+      throws SchedulerDynamicEditException {
+
+    if (!(queue instanceof ReservationQueue)) {
+      throw new SchedulerDynamicEditException("Queue " + queue.getQueueName()
+          + " is not a ReservationQueue");
+    }
+
+    ReservationQueue newQueue = (ReservationQueue) queue;
+
+    if (newQueue.getParent() == null
+        || !(newQueue.getParent() instanceof PlanQueue)) {
+      throw new SchedulerDynamicEditException("ParentQueue for "
+          + newQueue.getQueueName()
+          + " is not properly set (should be set and be a PlanQueue)");
+    }
+
+    PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
+    String queuename = newQueue.getQueueName();
+    parentPlan.addChildQueue(newQueue);
+    this.queues.put(queuename, newQueue);
+    LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
+  }
+
+  @Override
+  public synchronized void setEntitlement(String inQueue,
+      QueueEntitlement entitlement) throws SchedulerDynamicEditException,
+      YarnException {
+    LeafQueue queue = getAndCheckLeafQueue(inQueue);
+    ParentQueue parent = (ParentQueue) queue.getParent();
+
+    if (!(queue instanceof ReservationQueue)) {
+      throw new SchedulerDynamicEditException("Entitlement can not be"
+          + " modified dynamically since queue " + inQueue
+          + " is not a ReservationQueue");
+    }
+
+    if (!(parent instanceof PlanQueue)) {
+      throw new SchedulerDynamicEditException("The parent of ReservationQueue "
+          + inQueue + " must be an PlanQueue");
+    }
+
+    ReservationQueue newQueue = (ReservationQueue) queue;
+
+    float sumChilds = ((PlanQueue) parent).sumOfChildCapacities();
+    float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity();
+
+    if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) {
+      // note: epsilon checks here are not ok, as the epsilons might accumulate
+      // and become a problem in aggregate
+      if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0
+          && Math.abs(entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) {
+        return;
+      }
+      newQueue.setEntitlement(entitlement);
+    } else {
+      throw new SchedulerDynamicEditException(
+          "Sum of child queues would exceed 100% for PlanQueue: "
+              + parent.getQueueName());
+    }
+    LOG.info("Set entitlement for ReservationQueue " + inQueue + "  to "
+        + queue.getCapacity() + " request was (" + entitlement.getCapacity() + ")");
+  }
+
   @Override
   public synchronized String moveApplication(ApplicationId appId,
       String targetQueueName) throws YarnException {
@@ -1237,11 +1387,12 @@ public class CapacityScheduler extends
         getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
     String sourceQueueName = app.getQueue().getQueueName();
     LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
-    LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
+    String destQueueName = handleMoveToPlanQueue(targetQueueName);
+    LeafQueue dest = getAndCheckLeafQueue(destQueueName);
     // Validation check - ACLs, submission limits for user & queue
     String user = app.getUser();
     try {
-      dest.submitApplication(appId, user, targetQueueName);
+      dest.submitApplication(appId, user, destQueueName);
     } catch (AccessControlException e) {
       throw new YarnException(e);
     }
@@ -1260,7 +1411,7 @@ public class CapacityScheduler extends
     dest.submitApplicationAttempt(app, user);
     applications.get(appId).setQueue(dest);
     LOG.info("App: " + app.getApplicationId() + " successfully moved from "
-        + sourceQueueName + " to: " + targetQueueName);
+        + sourceQueueName + " to: " + destQueueName);
     return targetQueueName;
   }
 
@@ -1295,4 +1446,24 @@ public class CapacityScheduler extends
     return EnumSet
       .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
   }
+  
+  private String handleMoveToPlanQueue(String targetQueueName) {
+    CSQueue dest = getQueue(targetQueueName);
+    if (dest != null && dest instanceof PlanQueue) {
+      // use the default child reservation queue of the plan
+      targetQueueName = targetQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    }
+    return targetQueueName;
+  }
+
+  @Override
+  public Set<String> getPlanQueues() {
+    Set<String> ret = new HashSet<String>();
+    for (Map.Entry<String, CSQueue> l : queues.entrySet()) {
+      if (l.getValue() instanceof PlanQueue) {
+        ret.add(l.getKey());
+      }
+    }
+    return ret;
+  }
 }

+ 22 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -85,8 +85,8 @@ public class LeafQueue implements CSQueue {
   private int userLimit;
   private float userLimitFactor;
 
-  private int maxApplications;
-  private int maxApplicationsPerUser;
+  protected int maxApplications;
+  protected int maxApplicationsPerUser;
   
   private float maxAMResourcePerQueuePercent;
   private int maxActiveApplications; // Based on absolute max capacity
@@ -150,8 +150,7 @@ public class LeafQueue implements CSQueue {
             Resources.subtract(maximumAllocation, minimumAllocation), 
             maximumAllocation);
 
-    float capacity = 
-      (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
+    float capacity = getCapacityFromConf();
     float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
 
     float maximumCapacity = 
@@ -217,6 +216,11 @@ public class LeafQueue implements CSQueue {
     this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
   }
 
+  // externalizing in method, to allow overriding
+  protected float getCapacityFromConf() {
+    return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
+  }
+
   private synchronized void setupQueueConfigs(
       Resource clusterResource,
       float capacity, float absoluteCapacity, 
@@ -475,7 +479,7 @@ public class LeafQueue implements CSQueue {
    * Set user limit factor - used only for testing.
    * @param userLimitFactor new user limit factor
    */
-  synchronized void setUserLimitFactor(int userLimitFactor) {
+  synchronized void setUserLimitFactor(float userLimitFactor) {
     this.userLimitFactor = userLimitFactor;
   }
 
@@ -817,7 +821,7 @@ public class LeafQueue implements CSQueue {
           getApplication(reservedContainer.getApplicationAttemptId());
       synchronized (application) {
         return assignReservedContainer(application, node, reservedContainer,
-          clusterResource);
+            clusterResource);
       }
     }
     
@@ -1661,4 +1665,16 @@ public class LeafQueue implements CSQueue {
       getParent().detachContainer(clusterResource, application, rmContainer);
     }
   }
+
+  public void setCapacity(float capacity) {
+    this.capacity = capacity;
+  }
+
+  public void setAbsoluteCapacity(float absoluteCapacity) {
+    this.absoluteCapacity = absoluteCapacity;
+  }
+
+  public void setMaxApplications(int maxApplications) {
+    this.maxApplications = maxApplications;
+  }
 }

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -75,7 +75,7 @@ public class ParentQueue implements CSQueue {
 
   private float usedCapacity = 0.0f;
 
-  private final Set<CSQueue> childQueues;
+  protected final Set<CSQueue> childQueues;
   private final Comparator<CSQueue> queueComparator;
   
   private Resource usedResources = Resources.createResource(0, 0);
@@ -156,7 +156,7 @@ public class ParentQueue implements CSQueue {
         ", fullname=" + getQueuePath()); 
   }
 
-  private synchronized void setupQueueConfigs(
+  protected synchronized void setupQueueConfigs(
       Resource clusterResource,
       float capacity, float absoluteCapacity, 
       float maximumCapacity, float absoluteMaxCapacity,
@@ -824,4 +824,8 @@ public class ParentQueue implements CSQueue {
       }
     }
   }
+
+  public Map<QueueACL, AccessControlList> getACLs() {
+    return acls;
+  }
 }

+ 193 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java

@@ -0,0 +1,193 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a dynamic queue managed by the {@link ReservationSystem}.
+ * From the user perspective this is equivalent to a LeafQueue that respect
+ * reservations, but functionality wise is a sub-class of ParentQueue
+ *
+ */
+public class PlanQueue extends ParentQueue {
+
+  public static final String DEFAULT_QUEUE_SUFFIX = "-default";
+
+  private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
+
+  private int maxAppsForReservation;
+  private int maxAppsPerUserForReservation;
+  private int userLimit;
+  private float userLimitFactor;
+  protected CapacitySchedulerContext schedulerContext;
+  private boolean showReservationsAsQueues;
+
+  public PlanQueue(CapacitySchedulerContext cs, String queueName,
+      CSQueue parent, CSQueue old) {
+    super(cs, queueName, parent, old);
+
+    this.schedulerContext = cs;
+    // Set the reservation queue attributes for the Plan
+    CapacitySchedulerConfiguration conf = cs.getConfiguration();
+    String queuePath = super.getQueuePath();
+    int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath);
+    showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath);
+    if (maxAppsForReservation < 0) {
+      maxAppsForReservation =
+          (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
+              .getAbsoluteCapacity());
+    }
+    int userLimit = conf.getUserLimit(queuePath);
+    float userLimitFactor = conf.getUserLimitFactor(queuePath);
+    int maxAppsPerUserForReservation =
+        (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
+    updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
+        maxAppsPerUserForReservation);
+
+    StringBuffer queueInfo = new StringBuffer();
+    queueInfo.append("Created Plan Queue: ").append(queueName)
+        .append("\nwith capacity: [").append(super.getCapacity())
+        .append("]\nwith max capacity: [").append(super.getMaximumCapacity())
+        .append("\nwith max reservation apps: [").append(maxAppsForReservation)
+        .append("]\nwith max reservation apps per user: [")
+        .append(maxAppsPerUserForReservation).append("]\nwith user limit: [")
+        .append(userLimit).append("]\nwith user limit factor: [")
+        .append(userLimitFactor).append("].");
+    LOG.info(queueInfo.toString());
+  }
+
+  @Override
+  public synchronized void reinitialize(CSQueue newlyParsedQueue,
+      Resource clusterResource) throws IOException {
+    // Sanity check
+    if (!(newlyParsedQueue instanceof PlanQueue)
+        || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
+      throw new IOException("Trying to reinitialize " + getQueuePath()
+          + " from " + newlyParsedQueue.getQueuePath());
+    }
+
+    PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
+
+    if (newlyParsedParentQueue.getChildQueues().size() > 0) {
+      throw new IOException(
+          "Reservable Queue should not have sub-queues in the"
+              + "configuration");
+    }
+
+    // Set new configs
+    setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(),
+        newlyParsedParentQueue.getAbsoluteCapacity(),
+        newlyParsedParentQueue.getMaximumCapacity(),
+        newlyParsedParentQueue.getAbsoluteMaximumCapacity(),
+        newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs());
+
+    updateQuotas(newlyParsedParentQueue.userLimit,
+        newlyParsedParentQueue.userLimitFactor,
+        newlyParsedParentQueue.maxAppsForReservation,
+        newlyParsedParentQueue.maxAppsPerUserForReservation);
+
+    // run reinitialize on each existing queue, to trigger absolute cap
+    // recomputations
+    for (CSQueue res : this.getChildQueues()) {
+      res.reinitialize(res, clusterResource);
+    }
+    showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues;
+  }
+
+  synchronized void addChildQueue(CSQueue newQueue)
+      throws SchedulerDynamicEditException {
+    if (newQueue.getCapacity() > 0) {
+      throw new SchedulerDynamicEditException("Queue " + newQueue
+          + " being added has non zero capacity.");
+    }
+    boolean added = this.childQueues.add(newQueue);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("updateChildQueues (action: add queue): " + added + " "
+          + getChildQueuesToPrint());
+    }
+  }
+
+  synchronized void removeChildQueue(CSQueue remQueue)
+      throws SchedulerDynamicEditException {
+    if (remQueue.getCapacity() > 0) {
+      throw new SchedulerDynamicEditException("Queue " + remQueue
+          + " being removed has non zero capacity.");
+    }
+    Iterator<CSQueue> qiter = childQueues.iterator();
+    while (qiter.hasNext()) {
+      CSQueue cs = qiter.next();
+      if (cs.equals(remQueue)) {
+        qiter.remove();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removed child queue: {}", cs.getQueueName());
+        }
+      }
+    }
+  }
+
+  protected synchronized float sumOfChildCapacities() {
+    float ret = 0;
+    for (CSQueue l : childQueues) {
+      ret += l.getCapacity();
+    }
+    return ret;
+  }
+
+  private void updateQuotas(int userLimit, float userLimitFactor,
+      int maxAppsForReservation, int maxAppsPerUserForReservation) {
+    this.userLimit = userLimit;
+    this.userLimitFactor = userLimitFactor;
+    this.maxAppsForReservation = maxAppsForReservation;
+    this.maxAppsPerUserForReservation = maxAppsPerUserForReservation;
+  }
+
+  /**
+   * Number of maximum applications for each of the reservations in this Plan.
+   *
+   * @return maxAppsForreservation
+   */
+  public int getMaxApplicationsForReservations() {
+    return maxAppsForReservation;
+  }
+
+  /**
+   * Number of maximum applications per user for each of the reservations in
+   * this Plan.
+   *
+   * @return maxAppsPerUserForreservation
+   */
+  public int getMaxApplicationsPerUserForReservation() {
+    return maxAppsPerUserForReservation;
+  }
+
+  /**
+   * User limit value for each of the reservations in this Plan.
+   *
+   * @return userLimit
+   */
+  public int getUserLimitForReservation() {
+    return userLimit;
+  }
+
+  /**
+   * User limit factor value for each of the reservations in this Plan.
+   *
+   * @return userLimitFactor
+   */
+  public float getUserLimitFactor() {
+    return userLimitFactor;
+  }
+
+  /**
+   * Determine whether to hide/show the ReservationQueues
+   */
+  public boolean showReservationsAsQueues() {
+    return showReservationsAsQueues;
+  }
+}

+ 98 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java

@@ -0,0 +1,98 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a dynamic {@link LeafQueue} managed by the
+ * {@link ReservationSystem}
+ *
+ */
+public class ReservationQueue extends LeafQueue {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(ReservationQueue.class);
+
+  private PlanQueue parent;
+
+  private int maxSystemApps;
+
+  public ReservationQueue(CapacitySchedulerContext cs, String queueName,
+      PlanQueue parent) {
+    super(cs, queueName, parent, null);
+    maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
+    // the following parameters are common to all reservation in the plan
+    updateQuotas(parent.getUserLimitForReservation(),
+        parent.getUserLimitFactor(),
+        parent.getMaxApplicationsForReservations(),
+        parent.getMaxApplicationsPerUserForReservation());
+    this.parent = parent;
+  }
+
+  @Override
+  public synchronized void reinitialize(CSQueue newlyParsedQueue,
+      Resource clusterResource) throws IOException {
+    // Sanity check
+    if (!(newlyParsedQueue instanceof ReservationQueue)
+        || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
+      throw new IOException("Trying to reinitialize " + getQueuePath()
+          + " from " + newlyParsedQueue.getQueuePath());
+    }
+    CSQueueUtils.updateQueueStatistics(
+        parent.schedulerContext.getResourceCalculator(), newlyParsedQueue,
+        parent, parent.schedulerContext.getClusterResource(),
+        parent.schedulerContext.getMinimumResourceCapability());
+    super.reinitialize(newlyParsedQueue, clusterResource);
+    updateQuotas(parent.getUserLimitForReservation(),
+        parent.getUserLimitFactor(),
+        parent.getMaxApplicationsForReservations(),
+        parent.getMaxApplicationsPerUserForReservation());
+  }
+
+  /**
+   * This methods to change capacity for a queue and adjusts its
+   * absoluteCapacity
+   * 
+   * @param entitlement the new entitlement for the queue (capacity,
+   *          maxCapacity, etc..)
+   * @throws SchedulerDynamicEditException
+   */
+  public synchronized void setEntitlement(QueueEntitlement entitlement)
+      throws SchedulerDynamicEditException {
+    float capacity = entitlement.getCapacity();
+    if (capacity < 0 || capacity > 1.0f) {
+      throw new SchedulerDynamicEditException(
+          "Capacity demand is not in the [0,1] range: " + capacity);
+    }
+    setCapacity(capacity);
+    setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
+    setMaxApplications((int) (maxSystemApps * getAbsoluteCapacity()));
+    // note: we currently set maxCapacity to capacity
+    // this might be revised later
+    setMaxCapacity(entitlement.getMaxCapacity());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("successfully changed to " + capacity + " for queue "
+          + this.getQueueName());
+    }
+  }
+
+  private void updateQuotas(int userLimit, float userLimitFactor,
+      int maxAppsForReservation, int maxAppsPerUserForReservation) {
+    setUserLimit(userLimit);
+    setUserLimitFactor(userLimitFactor);
+    setMaxApplications(maxAppsForReservation);
+    maxApplicationsPerUser = maxAppsPerUserForReservation;
+  }
+
+  // used by the super constructor, we initialize to zero
+  protected float getCapacityFromConf() {
+    return 0f;
+  }
+
+}

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java

@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+
+public class QueueEntitlement {
+
+  private float capacity;
+  private float maxCapacity;
+
+  public QueueEntitlement(float capacity, float maxCapacity){
+    this.setCapacity(capacity);
+    this.maxCapacity = maxCapacity;
+   }
+
+  public float getMaxCapacity() {
+    return maxCapacity;
+  }
+
+  public void setMaxCapacity(float maxCapacity) {
+    this.maxCapacity = maxCapacity;
+  }
+
+  public float getCapacity() {
+    return capacity;
+  }
+
+  public void setCapacity(float capacity) {
+    this.capacity = capacity;
+  }
+}

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java

@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
 
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
@@ -48,6 +49,7 @@ public class CapacitySchedulerQueueInfo {
   protected QueueState state;
   protected CapacitySchedulerQueueInfoList queues;
   protected ResourceInfo resourcesUsed;
+  private boolean hideReservationQueues = true;
 
   CapacitySchedulerQueueInfo() {
   };
@@ -69,6 +71,10 @@ public class CapacitySchedulerQueueInfo {
     queueName = q.getQueueName();
     state = q.getState();
     resourcesUsed = new ResourceInfo(q.getUsedResources());
+    if(q instanceof PlanQueue &&
+       ((PlanQueue)q).showReservationsAsQueues()) {
+      hideReservationQueues = false;
+    }
   }
 
   public float getCapacity() {
@@ -112,6 +118,9 @@ public class CapacitySchedulerQueueInfo {
   }
 
   public CapacitySchedulerQueueInfoList getQueues() {
+    if(hideReservationQueues) {
+      return new CapacitySchedulerQueueInfoList();
+    }
     return this.queues;
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -408,7 +408,7 @@ public class TestCapacityScheduler {
     cs.stop();
   }
 
-  private void checkQueueCapacities(CapacityScheduler cs,
+  void checkQueueCapacities(CapacityScheduler cs,
       float capacityA, float capacityB) {
     CSQueue rootQueue = cs.getRootQueue();
     CSQueue queueA = findQueue(rootQueue, A);

+ 282 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java

@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCapacitySchedulerDynamicBehavior {
+  private static final Log LOG = LogFactory
+      .getLog(TestCapacitySchedulerDynamicBehavior.class);
+  private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+  private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+  private static final String B1 = B + ".b1";
+  private static final String B2 = B + ".b2";
+  private static final String B3 = B + ".b3";
+  private static float A_CAPACITY = 10.5f;
+  private static float B_CAPACITY = 89.5f;
+  private static float A1_CAPACITY = 30;
+  private static float A2_CAPACITY = 70;
+  private static float B1_CAPACITY = 79.2f;
+  private static float B2_CAPACITY = 0.8f;
+  private static float B3_CAPACITY = 20;
+
+  private final TestCapacityScheduler tcs = new TestCapacityScheduler();
+
+  private int GB = 1024;
+
+  private MockRM rm;
+
+  @Before
+  public void setUp() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupPlanQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_RESERVATIONS_ENABLE, false);
+    rm = new MockRM(conf);
+    rm.start();
+  }
+
+  @Test
+  public void testRefreshQueuesWithReservations() throws Exception {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Test add one reservation dynamically and manually modify capacity
+    ReservationQueue a1 =
+        new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+    cs.addQueue(a1);
+    a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
+
+    // Test add another reservation queue and use setEntitlement to modify
+    // capacity
+    ReservationQueue a2 =
+        new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
+    cs.addQueue(a2);
+    cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
+
+    // Verify all allocations match
+    tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    // Reinitialize and verify all dynamic queued survived
+    CapacitySchedulerConfiguration conf = cs.getConfiguration();
+    conf.setCapacity(A, 80f);
+    conf.setCapacity(B, 20f);
+    cs.reinitialize(conf, rm.getRMContext());
+
+    tcs.checkQueueCapacities(cs, 80f, 20f);
+  }
+
+  @Test
+  public void testAddQueueFailCases() throws Exception {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    try {
+      // Test invalid addition (adding non-zero size queue)
+      ReservationQueue a1 =
+          new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+      a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
+      cs.addQueue(a1);
+      fail();
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Test add one reservation dynamically and manually modify capacity
+    ReservationQueue a1 =
+        new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+    cs.addQueue(a1);
+    a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
+
+    // Test add another reservation queue and use setEntitlement to modify
+    // capacity
+    ReservationQueue a2 =
+        new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
+
+    cs.addQueue(a2);
+
+    try {
+      // Test invalid entitlement (sum of queues exceed 100%)
+      cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100 + 0.1f,
+          1.0f));
+      fail();
+    } catch (Exception e) {
+      // expected
+    }
+
+    cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
+
+    // Verify all allocations match
+    tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    cs.stop();
+  }
+
+  @Test
+  public void testRemoveQueue() throws Exception {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Test add one reservation dynamically and manually modify capacity
+    ReservationQueue a1 =
+        new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+    cs.addQueue(a1);
+    a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = cs.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+    try {
+      cs.removeQueue("a1");
+      fail();
+    } catch (SchedulerDynamicEditException s) {
+      // expected a1 contains applications
+    }
+    // clear queue by killling all apps
+    cs.killAllAppsInQueue("a1");
+    // wait for events of move to propagate
+    rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
+
+    try {
+      cs.removeQueue("a1");
+      fail();
+    } catch (SchedulerDynamicEditException s) {
+      // expected a1 is not zero capacity
+    }
+    // set capacity to zero
+    cs.setEntitlement("a1", new QueueEntitlement(0f, 0f));
+    cs.removeQueue("a1");
+
+    assertTrue(cs.getQueue("a1") == null);
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppToPlanQueue() throws Exception {
+    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "b1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(1, appsInB1.size());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertEquals(1, appsInB.size());
+    assertTrue(appsInB.contains(appAttemptId));
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    String queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("b1"));
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    // create the default reservation queue
+    String defQName = "a" + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    ReservationQueue defQ =
+        new ReservationQueue(scheduler, defQName,
+            (PlanQueue) scheduler.getQueue("a"));
+    scheduler.addQueue(defQ);
+    defQ.setEntitlement(new QueueEntitlement(1f, 1f));
+
+    List<ApplicationAttemptId> appsInDefQ = scheduler.getAppsInQueue(defQName);
+    assertTrue(appsInDefQ.isEmpty());
+
+    // now move the app to plan queue
+    scheduler.moveApplication(app.getApplicationId(), "a");
+
+    // check postconditions
+    appsInDefQ = scheduler.getAppsInQueue(defQName);
+    assertEquals(1, appsInDefQ.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInDefQ.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals(defQName));
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    rm.stop();
+  }
+
+  private void setupPlanQueueConfiguration(CapacitySchedulerConfiguration conf) {
+
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { "a", "b" });
+
+    conf.setCapacity(A, A_CAPACITY);
+    conf.setCapacity(B, B_CAPACITY);
+
+    // Define 2nd-level queues
+    conf.setQueues(B, new String[] { "b1", "b2", "b3" });
+    conf.setCapacity(B1, B1_CAPACITY);
+    conf.setUserLimitFactor(B1, 100.0f);
+    conf.setCapacity(B2, B2_CAPACITY);
+    conf.setUserLimitFactor(B2, 100.0f);
+    conf.setCapacity(B3, B3_CAPACITY);
+    conf.setUserLimitFactor(B3, 100.0f);
+
+    conf.setReservableQueue(A, true);
+    conf.setReservationWindow(A, 86400 * 1000);
+    conf.setAverageCapacity(A, 1.0f);
+
+    LOG.info("Setup a as a plan queue");
+  }
+
+}

+ 103 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReservationQueue {
+
+  CapacitySchedulerConfiguration csConf;
+  CapacitySchedulerContext csContext;
+  final static int GB = 1024;
+  private final ResourceCalculator resourceCalculator =
+      new DefaultResourceCalculator();
+  ReservationQueue reservationQueue;
+
+  @Before
+  public void setup() {
+
+    // setup a context / conf
+    csConf = new CapacitySchedulerConfiguration();
+    YarnConfiguration conf = new YarnConfiguration();
+    csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getConf()).thenReturn(conf);
+    when(csContext.getMinimumResourceCapability()).thenReturn(
+        Resources.createResource(GB, 1));
+    when(csContext.getMaximumResourceCapability()).thenReturn(
+        Resources.createResource(16 * GB, 32));
+    when(csContext.getClusterResource()).thenReturn(
+        Resources.createResource(100 * 16 * GB, 100 * 32));
+    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+
+    // create a queue
+    PlanQueue pq = new PlanQueue(csContext, "root", null, null);
+    reservationQueue = new ReservationQueue(csContext, "a", pq);
+
+  }
+
+  @Test
+  public void testAddSubtractCapacity() throws Exception {
+
+    // verify that setting, adding, subtracting capacity works
+    reservationQueue.setCapacity(1.0F);
+    assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+        reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
+    reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f));
+    assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+        reservationQueue.getCapacity() - 0.9 < CSQueueUtils.EPSILON);
+    reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f));
+    assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+        reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
+    reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f));
+    assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+        reservationQueue.getCapacity() < CSQueueUtils.EPSILON);
+
+    try {
+      reservationQueue.setEntitlement(new QueueEntitlement(1.1f, 1f));
+      fail();
+    } catch (SchedulerDynamicEditException iae) {
+      // expected
+      assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+          reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
+    }
+
+    try {
+      reservationQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f));
+      fail();
+    } catch (SchedulerDynamicEditException iae) {
+      // expected
+      assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+          reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
+    }
+
+  }
+}