Browse Source

YARN-2998. Abstract out scheduler independent PlanFollower components. (Anubhav Dhoot via kasha)

Karthik Kambatla 10 năm trước cách đây
mục cha
commit
e7257acd8a
11 tập tin đã thay đổi với 755 bổ sung450 xóa
  1. 6 0
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
  2. 5 0
      hadoop-yarn-project/CHANGES.txt
  3. 412 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
  4. 77 284
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
  5. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
  6. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  7. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  8. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  9. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
  10. 48 161
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
  11. 191 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java

+ 6 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java

@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoSchedule
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.web.SLSWebApp;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Logger;
 
@@ -866,6 +867,11 @@ final public class ResourceSchedulerWrapper
     return scheduler.getMaximumResourceCapability();
   }
 
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return scheduler.getResourceCalculator();
+  }
+
   @Override
   public int getNumClusterNodes() {
     return scheduler.getNumClusterNodes();

+ 5 - 0
hadoop-yarn-project/CHANGES.txt

@@ -70,6 +70,7 @@ Release 2.7.0 - UNRELEASED
     YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. 
     (Anubhav Dhoot via kasha)
 
+
   IMPROVEMENTS
 
     YARN-2950. Change message to mandate, not suggest JS requirement on UI.
@@ -157,6 +158,10 @@ Release 2.7.0 - UNRELEASED
 
     YARN-2943. Added node-labels page on RM web UI. (Wangda Tan via jianhe)
 
+    YARN-2998. Abstract out scheduler independent PlanFollower components. 
+    (Anubhav Dhoot via kasha)
+
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 412 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java

@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CapacitySchedulerPlanFollower.class);
+
+  protected Collection<Plan> plans = new ArrayList<Plan>();
+  protected YarnScheduler scheduler;
+  protected Clock clock;
+
+  @Override
+  public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
+    this.clock = clock;
+    this.scheduler = sched;
+    this.plans.addAll(plans);
+  }
+
+  @Override
+  public synchronized void run() {
+    for (Plan plan : plans) {
+      synchronizePlan(plan);
+    }
+  }
+
+  @Override
+  public synchronized void setPlans(Collection<Plan> plans) {
+    this.plans.clear();
+    this.plans.addAll(plans);
+  }
+
+  @Override
+  public synchronized void synchronizePlan(Plan plan) {
+     String planQueueName = plan.getQueueName();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
+    }
+    // align with plan step
+    long step = plan.getStep();
+    long now = clock.getTime();
+    if (now % step != 0) {
+      now += step - (now % step);
+    }
+    Queue planQueue = getPlanQueue(planQueueName);
+    if (planQueue == null) return;
+
+    // first we publish to the plan the current availability of resources
+    Resource clusterResources = scheduler.getClusterResource();
+    Resource planResources = getPlanResources(plan, planQueue,
+        clusterResources);
+
+    Set<ReservationAllocation> currentReservations =
+        plan.getReservationsAtTime(now);
+    Set<String> curReservationNames = new HashSet<String>();
+    Resource reservedResources = Resource.newInstance(0, 0);
+    int numRes = getReservedResources(now, currentReservations,
+        curReservationNames, reservedResources);
+
+    // create the default reservation queue if it doesnt exist
+    String defReservationId = getReservationIdFromQueueName(planQueueName) +
+        PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    String defReservationQueue = getReservationQueueName(planQueueName,
+        defReservationId);
+    createDefaultReservationQueue(planQueueName, planQueue,
+        defReservationId);
+    curReservationNames.add(defReservationId);
+
+    // if the resources dedicated to this plan has shrunk invoke replanner
+    if (arePlanResourcesLessThanReservations(clusterResources, planResources,
+        reservedResources)) {
+      try {
+        plan.getReplanner().plan(plan, null);
+      } catch (PlanningException e) {
+        LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+      }
+    }
+    // identify the reservations that have expired and new reservations that
+    // have to be activated
+    List<? extends Queue> resQueues = getChildReservationQueues(planQueue);
+    Set<String> expired = new HashSet<String>();
+    for (Queue resQueue : resQueues) {
+      String resQueueName = resQueue.getQueueName();
+      String reservationId = getReservationIdFromQueueName(resQueueName);
+      if (curReservationNames.contains(reservationId)) {
+        // it is already existing reservation, so needed not create new
+        // reservation queue
+        curReservationNames.remove(reservationId);
+      } else {
+        // the reservation has termination, mark for cleanup
+        expired.add(reservationId);
+      }
+    }
+    // garbage collect expired reservations
+    cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired,
+        defReservationQueue);
+
+    // Add new reservations and update existing ones
+    float totalAssignedCapacity = 0f;
+    if (currentReservations != null) {
+      // first release all excess capacity in default queue
+      try {
+        setQueueEntitlement(planQueueName, defReservationQueue, 0f, 1.0f);
+      } catch (YarnException e) {
+        LOG.warn(
+            "Exception while trying to release default queue capacity for plan: {}",
+            planQueueName, e);
+      }
+      // sort allocations from the one giving up the most resources, to the
+      // one asking for the most
+      // avoid order-of-operation errors that temporarily violate 100%
+      // capacity bound
+      List<ReservationAllocation> sortedAllocations =
+          sortByDelta(
+              new ArrayList<ReservationAllocation>(currentReservations), now,
+              plan);
+      for (ReservationAllocation res : sortedAllocations) {
+        String currResId = res.getReservationId().toString();
+        if (curReservationNames.contains(currResId)) {
+          addReservationQueue(planQueueName, planQueue, currResId);
+        }
+        Resource capToAssign = res.getResourcesAtTime(now);
+        float targetCapacity = 0f;
+        if (planResources.getMemory() > 0
+            && planResources.getVirtualCores() > 0) {
+          targetCapacity =
+              calculateReservationToPlanRatio(clusterResources,
+                  planResources,
+                  capToAssign);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Assigning capacity of {} to queue {} with target capacity {}",
+              capToAssign, currResId, targetCapacity);
+        }
+        // set maxCapacity to 100% unless the job requires gang, in which
+        // case we stick to capacity (as running early/before is likely a
+        // waste of resources)
+        float maxCapacity = 1.0f;
+        if (res.containsGangs()) {
+          maxCapacity = targetCapacity;
+        }
+        try {
+          setQueueEntitlement(planQueueName, currResId, targetCapacity, maxCapacity);
+        } catch (YarnException e) {
+          LOG.warn("Exception while trying to size reservation for plan: {}",
+              currResId, planQueueName, e);
+        }
+        totalAssignedCapacity += targetCapacity;
+      }
+    }
+    // compute the default queue capacity
+    float defQCap = 1.0f - totalAssignedCapacity;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+          + "currReservation: {} default-queue capacity: {}", planResources,
+          numRes, defQCap);
+    }
+    // set the default queue to eat-up all remaining capacity
+    try {
+      setQueueEntitlement(planQueueName, defReservationQueue, defQCap, 1.0f);
+    } catch (YarnException e) {
+      LOG.warn(
+          "Exception while trying to reclaim default queue capacity for plan: {}",
+          planQueueName, e);
+    }
+    // garbage collect finished reservations from plan
+    try {
+      plan.archiveCompletedReservations(now);
+    } catch (PlanningException e) {
+      LOG.error("Exception in archiving completed reservations: ", e);
+    }
+    LOG.info("Finished iteration of plan follower edit policy for plan: "
+        + planQueueName);
+
+    // Extension: update plan with app states,
+    // useful to support smart replanning
+  }
+
+  protected String getReservationIdFromQueueName(String resQueueName) {
+    return resQueueName;
+  }
+
+  protected void setQueueEntitlement(String planQueueName, String currResId,
+      float targetCapacity,
+      float maxCapacity) throws YarnException {
+    String reservationQueueName = getReservationQueueName(planQueueName,
+        currResId);
+    scheduler.setEntitlement(reservationQueueName, new QueueEntitlement(
+        targetCapacity, maxCapacity));
+  }
+
+  // Schedulers have different ways of naming queues. See YARN-2773
+  protected String getReservationQueueName(String planQueueName,
+      String reservationId) {
+    return reservationId;
+  }
+
+  /**
+   * First sets entitlement of queues to zero to prevent new app submission.
+   * Then move all apps in the set of queues to the parent plan queue's default
+   * reservation queue if move is enabled. Finally cleanups the queue by killing
+   * any apps (if move is disabled or move failed) and removing the queue
+   */
+  protected void cleanupExpiredQueues(String planQueueName,
+      boolean shouldMove, Set<String> toRemove, String defReservationQueue) {
+    for (String expiredReservationId : toRemove) {
+      try {
+        // reduce entitlement to 0
+        String expiredReservation = getReservationQueueName(planQueueName,
+            expiredReservationId);
+        setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f);
+        if (shouldMove) {
+          moveAppsInQueueSync(expiredReservation, defReservationQueue);
+        }
+        if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
+          scheduler.killAllAppsInQueue(expiredReservation);
+          LOG.info("Killing applications in queue: {}", expiredReservation);
+        } else {
+          scheduler.removeQueue(expiredReservation);
+          LOG.info("Queue: " + expiredReservation + " removed");
+        }
+      } catch (YarnException e) {
+        LOG.warn("Exception while trying to expire reservation: {}",
+            expiredReservationId, e);
+      }
+    }
+  }
+
+  /**
+   * Move all apps in the set of queues to the parent plan queue's default
+   * reservation queue in a synchronous fashion
+   */
+  private void moveAppsInQueueSync(String expiredReservation,
+                                   String defReservationQueue) {
+    List<ApplicationAttemptId> activeApps =
+        scheduler.getAppsInQueue(expiredReservation);
+    if (activeApps.isEmpty()) {
+      return;
+    }
+    for (ApplicationAttemptId app : activeApps) {
+      // fallback to parent's default queue
+      try {
+        scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
+      } catch (YarnException e) {
+        LOG.warn(
+            "Encountered unexpected error during migration of application: {}" +
+                " from reservation: {}",
+            app, expiredReservation, e);
+      }
+    }
+  }
+
+  protected int getReservedResources(long now, Set<ReservationAllocation>
+      currentReservations, Set<String> curReservationNames,
+                                     Resource reservedResources) {
+    int numRes = 0;
+    if (currentReservations != null) {
+      numRes = currentReservations.size();
+      for (ReservationAllocation reservation : currentReservations) {
+        curReservationNames.add(reservation.getReservationId().toString());
+        Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
+      }
+    }
+    return numRes;
+  }
+
+  /**
+   * Sort in the order from the least new amount of resources asked (likely
+   * negative) to the highest. This prevents "order-of-operation" errors related
+   * to exceeding 100% capacity temporarily.
+   */
+  protected List<ReservationAllocation> sortByDelta(
+      List<ReservationAllocation> currentReservations, long now, Plan plan) {
+    Collections.sort(currentReservations, new ReservationAllocationComparator(
+        now, this, plan));
+    return currentReservations;
+  }
+
+  /**
+   * Get queue associated with reservable queue named
+   * @param planQueueName Name of the reservable queue
+   * @return queue associated with the reservable queue
+   */
+  protected abstract Queue getPlanQueue(String planQueueName);
+
+  /**
+   * Calculates ratio of reservationResources to planResources
+   */
+  protected abstract float calculateReservationToPlanRatio(
+      Resource clusterResources, Resource planResources,
+      Resource reservationResources);
+
+  /**
+   * Check if plan resources are less than expected reservation resources
+   */
+  protected abstract boolean arePlanResourcesLessThanReservations(
+      Resource clusterResources, Resource planResources,
+      Resource reservedResources);
+
+  /**
+   * Get a list of reservation queues for this planQueue
+   */
+  protected abstract List<? extends Queue> getChildReservationQueues(
+      Queue planQueue);
+
+  /**
+   * Add a new reservation queue for reservation currResId for this planQueue
+   */
+  protected abstract void addReservationQueue(
+      String planQueueName, Queue queue, String currResId);
+
+  /**
+   * Creates the default reservation queue for use when no reservation is
+   * used for applications submitted to this planQueue
+   */
+  protected abstract void createDefaultReservationQueue(
+      String planQueueName, Queue queue, String defReservationQueue);
+
+  /**
+   * Get plan resources for this planQueue
+   */
+  protected abstract Resource getPlanResources(
+      Plan plan, Queue queue, Resource clusterResources);
+
+  /**
+   * Get reservation queue resources if it exists otherwise return null
+   */
+  protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
+      ReservationId reservationId);
+
+  private static class ReservationAllocationComparator implements
+      Comparator<ReservationAllocation> {
+    AbstractSchedulerPlanFollower planFollower;
+    long now;
+    Plan plan;
+
+    ReservationAllocationComparator(long now,
+        AbstractSchedulerPlanFollower planFollower, Plan plan) {
+      this.now = now;
+      this.planFollower = planFollower;
+      this.plan = plan;
+    }
+
+    private Resource getUnallocatedReservedResources(
+        ReservationAllocation reservation) {
+      Resource resResource;
+      Resource reservationResource = planFollower
+          .getReservationQueueResourceIfExists
+              (plan, reservation.getReservationId());
+      if (reservationResource != null) {
+        resResource =
+            Resources.subtract(
+                reservation.getResourcesAtTime(now),
+                reservationResource);
+      } else {
+        resResource = reservation.getResourcesAtTime(now);
+      }
+      return resResource;
+    }
+
+    @Override
+    public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
+      // compute delta between current and previous reservation, and compare
+      // based on that
+      Resource lhsRes = getUnallocatedReservedResources(lhs);
+      Resource rhsRes = getUnallocatedReservedResources(rhs);
+      return lhsRes.compareTo(rhsRes);
+    }
+  }
+}
+

+ 77 - 284
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java

@@ -19,26 +19,19 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
@@ -58,319 +51,119 @@ import org.slf4j.LoggerFactory;
  * differences among existing queues). This makes it resilient to frequency of
  * synchronization, and RM restart issues (no "catch up" is necessary).
  */
-public class CapacitySchedulerPlanFollower implements PlanFollower {
+public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(CapacitySchedulerPlanFollower.class);
 
-  private Collection<Plan> plans = new ArrayList<Plan>();
-
-  private Clock clock;
-  private CapacityScheduler scheduler;
+  private CapacityScheduler cs;
 
   @Override
   public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
+    super.init(clock, sched, plans);
     LOG.info("Initializing Plan Follower Policy:"
         + this.getClass().getCanonicalName());
     if (!(sched instanceof CapacityScheduler)) {
       throw new YarnRuntimeException(
           "CapacitySchedulerPlanFollower can only work with CapacityScheduler");
     }
-    this.clock = clock;
-    this.scheduler = (CapacityScheduler) sched;
-    this.plans.addAll(plans);
+    this.cs = (CapacityScheduler) sched;
   }
 
   @Override
-  public synchronized void run() {
-    for (Plan plan : plans) {
-      synchronizePlan(plan);
+  protected Queue getPlanQueue(String planQueueName) {
+    CSQueue queue = cs.getQueue(planQueueName);
+    if (!(queue instanceof PlanQueue)) {
+      LOG.error("The Plan is not an PlanQueue!");
+      return null;
     }
+    return queue;
   }
 
   @Override
-  public synchronized void synchronizePlan(Plan plan) {
-    String planQueueName = plan.getQueueName();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
-    }
-    // align with plan step
-    long step = plan.getStep();
-    long now = clock.getTime();
-    if (now % step != 0) {
-      now += step - (now % step);
-    }
-    CSQueue queue = scheduler.getQueue(planQueueName);
-    if (!(queue instanceof PlanQueue)) {
-      LOG.error("The Plan is not an PlanQueue!");
-      return;
-    }
-    PlanQueue planQueue = (PlanQueue) queue;
-    // first we publish to the plan the current availability of resources
-    Resource clusterResources = scheduler.getClusterResource();
-    float planAbsCap = planQueue.getAbsoluteCapacity();
-    Resource planResources = Resources.multiply(clusterResources, planAbsCap);
-    plan.setTotalCapacity(planResources);
+  protected float calculateReservationToPlanRatio(
+      Resource clusterResources, Resource planResources,
+      Resource reservationResources) {
+    return Resources.divide(cs.getResourceCalculator(),
+        clusterResources, reservationResources, planResources);
+  }
 
-    Set<ReservationAllocation> currentReservations =
-        plan.getReservationsAtTime(now);
-    Set<String> curReservationNames = new HashSet<String>();
-    Resource reservedResources = Resource.newInstance(0, 0);
-    int numRes = 0;
-    if (currentReservations != null) {
-      numRes = currentReservations.size();
-      for (ReservationAllocation reservation : currentReservations) {
-        curReservationNames.add(reservation.getReservationId().toString());
-        Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
-      }
+  @Override
+  protected boolean arePlanResourcesLessThanReservations(
+      Resource clusterResources, Resource planResources,
+      Resource reservedResources) {
+    return Resources.greaterThan(cs.getResourceCalculator(),
+        clusterResources, reservedResources, planResources);
+  }
+
+  @Override
+  protected List<? extends Queue> getChildReservationQueues(Queue queue) {
+    PlanQueue planQueue = (PlanQueue)queue;
+    List<CSQueue> childQueues = planQueue.getChildQueues();
+    return childQueues;
+  }
+
+  @Override
+  protected void addReservationQueue(
+      String planQueueName, Queue queue, String currResId) {
+    PlanQueue planQueue = (PlanQueue)queue;
+    try {
+      ReservationQueue resQueue =
+          new ReservationQueue(cs, currResId, planQueue);
+      cs.addQueue(resQueue);
+    } catch (SchedulerDynamicEditException e) {
+      LOG.warn(
+          "Exception while trying to activate reservation: {} for plan: {}",
+          currResId, planQueueName, e);
+    } catch (IOException e) {
+      LOG.warn(
+          "Exception while trying to activate reservation: {} for plan: {}",
+          currResId, planQueueName, e);
     }
-    // create the default reservation queue if it doesnt exist
-    String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
-    if (scheduler.getQueue(defReservationQueue) == null) {
+  }
+
+  @Override
+  protected void createDefaultReservationQueue(
+      String planQueueName, Queue queue, String defReservationId) {
+    PlanQueue planQueue = (PlanQueue)queue;
+    if (cs.getQueue(defReservationId) == null) {
       try {
         ReservationQueue defQueue =
-            new ReservationQueue(scheduler, defReservationQueue, planQueue);
-        scheduler.addQueue(defQueue);
+            new ReservationQueue(cs, defReservationId, planQueue);
+        cs.addQueue(defQueue);
       } catch (SchedulerDynamicEditException e) {
         LOG.warn(
             "Exception while trying to create default reservation queue for plan: {}",
             planQueueName, e);
       } catch (IOException e) {
         LOG.warn(
-            "Exception while trying to create default reservation queue for plan: {}",
-            planQueueName, e);
-      }
-    }
-    curReservationNames.add(defReservationQueue);
-    // if the resources dedicated to this plan has shrunk invoke replanner
-    if (Resources.greaterThan(scheduler.getResourceCalculator(),
-        clusterResources, reservedResources, planResources)) {
-      try {
-        plan.getReplanner().plan(plan, null);
-      } catch (PlanningException e) {
-        LOG.warn("Exception while trying to replan: {}", planQueueName, e);
-      }
-    }
-    // identify the reservations that have expired and new reservations that
-    // have to be activated
-    List<CSQueue> resQueues = planQueue.getChildQueues();
-    Set<String> expired = new HashSet<String>();
-    for (CSQueue resQueue : resQueues) {
-      String resQueueName = resQueue.getQueueName();
-      if (curReservationNames.contains(resQueueName)) {
-        // it is already existing reservation, so needed not create new
-        // reservation queue
-        curReservationNames.remove(resQueueName);
-      } else {
-        // the reservation has termination, mark for cleanup
-        expired.add(resQueueName);
-      }
-    }
-    // garbage collect expired reservations
-    cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue);
-
-    // Add new reservations and update existing ones
-    float totalAssignedCapacity = 0f;
-    if (currentReservations != null) {
-      // first release all excess capacity in default queue
-      try {
-        scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f,
-            1.0f));
-      } catch (YarnException e) {
-        LOG.warn(
-            "Exception while trying to release default queue capacity for plan: {}",
+            "Exception while trying to create default reservation queue for " +
+                "plan: {}",
             planQueueName, e);
       }
-      // sort allocations from the one giving up the most resources, to the
-      // one asking for the most
-      // avoid order-of-operation errors that temporarily violate 100%
-      // capacity bound
-      List<ReservationAllocation> sortedAllocations =
-          sortByDelta(
-              new ArrayList<ReservationAllocation>(currentReservations), now);
-      for (ReservationAllocation res : sortedAllocations) {
-        String currResId = res.getReservationId().toString();
-        if (curReservationNames.contains(currResId)) {
-          try {
-            ReservationQueue resQueue =
-                new ReservationQueue(scheduler, currResId, planQueue);
-            scheduler.addQueue(resQueue);
-          } catch (SchedulerDynamicEditException e) {
-            LOG.warn(
-                "Exception while trying to activate reservation: {} for plan: {}",
-                currResId, planQueueName, e);
-          } catch (IOException e) {
-            LOG.warn(
-                "Exception while trying to activate reservation: {} for plan: {}",
-                currResId, planQueueName, e);
-          }
-        }
-        Resource capToAssign = res.getResourcesAtTime(now);
-        float targetCapacity = 0f;
-        if (planResources.getMemory() > 0
-            && planResources.getVirtualCores() > 0) {
-          targetCapacity =
-              Resources.divide(scheduler.getResourceCalculator(),
-                  clusterResources, capToAssign, planResources);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(
-              "Assigning capacity of {} to queue {} with target capacity {}",
-              capToAssign, currResId, targetCapacity);
-        }
-        // set maxCapacity to 100% unless the job requires gang, in which
-        // case we stick to capacity (as running early/before is likely a
-        // waste of resources)
-        float maxCapacity = 1.0f;
-        if (res.containsGangs()) {
-          maxCapacity = targetCapacity;
-        }
-        try {
-          scheduler.setEntitlement(currResId, new QueueEntitlement(
-              targetCapacity, maxCapacity));
-        } catch (YarnException e) {
-          LOG.warn("Exception while trying to size reservation for plan: {}",
-              currResId, planQueueName, e);
-        }
-        totalAssignedCapacity += targetCapacity;
-      }
-    }
-    // compute the default queue capacity
-    float defQCap = 1.0f - totalAssignedCapacity;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
-          + "currReservation: {} default-queue capacity: {}", planResources,
-          numRes, defQCap);
-    }
-    // set the default queue to eat-up all remaining capacity
-    try {
-      scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(
-          defQCap, 1.0f));
-    } catch (YarnException e) {
-      LOG.warn(
-          "Exception while trying to reclaim default queue capacity for plan: {}",
-          planQueueName, e);
-    }
-    // garbage collect finished reservations from plan
-    try {
-      plan.archiveCompletedReservations(now);
-    } catch (PlanningException e) {
-      LOG.error("Exception in archiving completed reservations: ", e);
-    }
-    LOG.info("Finished iteration of plan follower edit policy for plan: "
-        + planQueueName);
-
-    // Extension: update plan with app states,
-    // useful to support smart replanning
-  }
-
-  /**
-   * Move all apps in the set of queues to the parent plan queue's default
-   * reservation queue in a synchronous fashion
-   */
-  private void moveAppsInQueueSync(String expiredReservation,
-      String defReservationQueue) {
-    List<ApplicationAttemptId> activeApps =
-        scheduler.getAppsInQueue(expiredReservation);
-    if (activeApps.isEmpty()) {
-      return;
-    }
-    for (ApplicationAttemptId app : activeApps) {
-      // fallback to parent's default queue
-      try {
-        scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
-      } catch (YarnException e) {
-        LOG.warn(
-            "Encountered unexpected error during migration of application: {} from reservation: {}",
-            app, expiredReservation, e);
-      }
-    }
-  }
-
-  /**
-   * First sets entitlement of queues to zero to prevent new app submission.
-   * Then move all apps in the set of queues to the parent plan queue's default
-   * reservation queue if move is enabled. Finally cleanups the queue by killing
-   * any apps (if move is disabled or move failed) and removing the queue
-   */
-  private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove,
-      String defReservationQueue) {
-    for (String expiredReservation : toRemove) {
-      try {
-        // reduce entitlement to 0
-        scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f,
-            0.0f));
-        if (shouldMove) {
-          moveAppsInQueueSync(expiredReservation, defReservationQueue);
-        }
-        if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
-          scheduler.killAllAppsInQueue(expiredReservation);
-          LOG.info("Killing applications in queue: {}", expiredReservation);
-        } else {
-          scheduler.removeQueue(expiredReservation);
-          LOG.info("Queue: " + expiredReservation + " removed");
-        }
-      } catch (YarnException e) {
-        LOG.warn("Exception while trying to expire reservation: {}",
-            expiredReservation, e);
-      }
     }
   }
 
   @Override
-  public synchronized void setPlans(Collection<Plan> plans) {
-    this.plans.clear();
-    this.plans.addAll(plans);
-  }
-
-  /**
-   * Sort in the order from the least new amount of resources asked (likely
-   * negative) to the highest. This prevents "order-of-operation" errors related
-   * to exceeding 100% capacity temporarily.
-   */
-  private List<ReservationAllocation> sortByDelta(
-      List<ReservationAllocation> currentReservations, long now) {
-    Collections.sort(currentReservations, new ReservationAllocationComparator(
-        scheduler, now));
-    return currentReservations;
+  protected Resource getPlanResources(
+      Plan plan, Queue queue, Resource clusterResources) {
+    PlanQueue planQueue = (PlanQueue)queue;
+    float planAbsCap = planQueue.getAbsoluteCapacity();
+    Resource planResources = Resources.multiply(clusterResources, planAbsCap);
+    plan.setTotalCapacity(planResources);
+    return planResources;
   }
 
-  private static class ReservationAllocationComparator implements
-      Comparator<ReservationAllocation> {
-    CapacityScheduler scheduler;
-    long now;
-
-    ReservationAllocationComparator(CapacityScheduler scheduler, long now) {
-      this.scheduler = scheduler;
-      this.now = now;
-    }
-
-    private Resource getUnallocatedReservedResources(
-        ReservationAllocation reservation) {
-      Resource resResource;
-      CSQueue resQueue =
-          scheduler.getQueue(reservation.getReservationId().toString());
-      if (resQueue != null) {
-        resResource =
-            Resources.subtract(
-                reservation.getResourcesAtTime(now),
-                Resources.multiply(scheduler.getClusterResource(),
-                    resQueue.getAbsoluteCapacity()));
-      } else {
-        resResource = reservation.getResourcesAtTime(now);
-      }
-      return resResource;
-    }
-
-    @Override
-    public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
-      // compute delta between current and previous reservation, and compare
-      // based on that
-      Resource lhsRes = getUnallocatedReservedResources(lhs);
-      Resource rhsRes = getUnallocatedReservedResources(rhs);
-      return lhsRes.compareTo(rhsRes);
-    }
-
+  @Override
+  protected Resource getReservationQueueResourceIfExists(Plan plan,
+      ReservationId reservationId) {
+    CSQueue resQueue = cs.getQueue(reservationId.toString());
+    Resource reservationResource = null;
+    if (resQueue != null) {
+      reservationResource = Resources.multiply(cs.getClusterResource(),
+          resQueue.getAbsoluteCapacity());
+    }
+    return reservationResource;
   }
 
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
 /**
  * This interface is used by the components to talk to the
@@ -98,6 +99,10 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
   @Stable
   public Resource getMaximumResourceCapability();
 
+  @LimitedPrivate("yarn")
+  @Evolving
+  ResourceCalculator getResourceCalculator();
+
   /**
    * Get the number of nodes available in the cluster.
    * @return the number of available nodes.

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -544,9 +544,9 @@ public class FSLeafQueue extends FSQueue {
   }
 
   private boolean isStarved(Resource share) {
-    Resource desiredShare = Resources.min(FairScheduler.getResourceCalculator(),
+    Resource desiredShare = Resources.min(scheduler.getResourceCalculator(),
         scheduler.getClusterResource(), share, getDemand());
-    return Resources.lessThan(FairScheduler.getResourceCalculator(),
+    return Resources.lessThan(scheduler.getResourceCalculator(),
         scheduler.getClusterResource(), getResourceUsage(), desiredShare);
   }
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -1094,7 +1094,8 @@ public class FairScheduler extends
     return super.getApplicationAttempt(appAttemptId);
   }
 
-  public static ResourceCalculator getResourceCalculator() {
+  @Override
+  public ResourceCalculator getResourceCalculator() {
     return RESOURCE_CALCULATOR;
   }
 

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -919,6 +919,11 @@ public class FifoScheduler extends
     return DEFAULT_QUEUE.getQueueUserAclInfo(null); 
   }
 
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return resourceCalculator;
+  }
+
   private synchronized void addNode(RMNode nodeManager) {
     FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
         usePortForNodeName);

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java

@@ -104,7 +104,7 @@ public class ReservationSystemTestUtil {
         .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
   }
 
-  static void setupFSAllocationFile(String allocationFile)
+  public static void setupFSAllocationFile(String allocationFile)
       throws IOException {
     PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
     out.println("<?xml version=\"1.0\"?>");
@@ -130,7 +130,7 @@ public class ReservationSystemTestUtil {
     out.close();
   }
 
-  static void updateFSAllocationFile(String allocationFile)
+  public static void updateFSAllocationFile(String allocationFile)
       throws IOException {
     PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
     out.println("<?xml version=\"1.0\"?>");

+ 48 - 161
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java

@@ -33,25 +33,20 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
@@ -62,21 +57,12 @@ import org.junit.rules.TestName;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 
-public class TestCapacitySchedulerPlanFollower {
+public class TestCapacitySchedulerPlanFollower extends TestSchedulerPlanFollowerBase {
 
-  final static int GB = 1024;
-
-  private Clock mClock = null;
-  private CapacityScheduler scheduler = null;
   private RMContext rmContext;
   private RMContext spyRMContext;
   private CapacitySchedulerContext csContext;
-  private ReservationAgent mAgent;
-  private Plan plan;
-  private Resource minAlloc = Resource.newInstance(GB, 1);
-  private Resource maxAlloc = Resource.newInstance(GB * 8, 8);
-  private ResourceCalculator res = new DefaultResourceCalculator();
-  private CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+  private CapacityScheduler cs;
 
   @Rule
   public TestName name = new TestName();
@@ -84,7 +70,9 @@ public class TestCapacitySchedulerPlanFollower {
   @Before
   public void setUp() throws Exception {
     CapacityScheduler spyCs = new CapacityScheduler();
-    scheduler = spy(spyCs);
+    cs = spy(spyCs);
+    scheduler = cs;
+
     rmContext = TestUtils.getMockRMContext();
     spyRMContext = spy(rmContext);
 
@@ -100,7 +88,7 @@ public class TestCapacitySchedulerPlanFollower {
         new CapacitySchedulerConfiguration();
     ReservationSystemTestUtil.setupQueueConfiguration(csConf);
 
-    scheduler.setConf(csConf);
+    cs.setConf(csConf);
 
     csContext = mock(CapacitySchedulerContext.class);
     when(csContext.getConfiguration()).thenReturn(csConf);
@@ -119,9 +107,9 @@ public class TestCapacitySchedulerPlanFollower {
     when(csContext.getContainerTokenSecretManager()).thenReturn(
         containerTokenSecretManager);
 
-    scheduler.setRMContext(spyRMContext);
-    scheduler.init(csConf);
-    scheduler.start();
+    cs.setRMContext(spyRMContext);
+    cs.init(csConf);
+    cs.start();
 
     setupPlanFollower();
   }
@@ -132,7 +120,7 @@ public class TestCapacitySchedulerPlanFollower {
     mAgent = mock(ReservationAgent.class);
 
     String reservationQ = testUtil.getFullReservationQueueName();
-    CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
+    CapacitySchedulerConfiguration csConf = cs.getConfiguration();
     csConf.setReservationWindow(reservationQ, 20L);
     csConf.setMaximumCapacity(reservationQ, 40);
     csConf.setAverageCapacity(reservationQ, 20);
@@ -153,155 +141,51 @@ public class TestCapacitySchedulerPlanFollower {
     testPlanFollower(false);
   }
 
-  private void testPlanFollower(boolean isMove) throws PlanningException,
-      InterruptedException, AccessControlException {
-    // Initialize plan based on move flag
-    plan =
-        new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
-            scheduler.getClusterResource(), 1L, res,
-            scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
-            null, isMove);
-
-    // add a few reservations to the plan
-    long ts = System.currentTimeMillis();
-    ReservationId r1 = ReservationId.newInstance(ts, 1);
-    int[] f1 = { 10, 10, 10, 10, 10 };
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
-            "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
-                .generateAllocation(0L, 1L, f1), res, minAlloc)));
+  @Override
+  protected void verifyCapacity(Queue defQ) {
+    CSQueue csQueue = (CSQueue)defQ;
+    assertTrue(csQueue.getCapacity() > 0.9);
+  }
 
-    ReservationId r2 = ReservationId.newInstance(ts, 2);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
-            "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
-                .generateAllocation(3L, 1L, f1), res, minAlloc)));
+  @Override
+  protected Queue getDefaultQueue() {
+    return cs.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
+  }
 
-    ReservationId r3 = ReservationId.newInstance(ts, 3);
-    int[] f2 = { 0, 10, 20, 10, 0 };
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
-            "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
-                .generateAllocation(10L, 1L, f2), res, minAlloc)));
+  @Override
+  protected int getNumberOfApplications(Queue queue) {
+    CSQueue csQueue = (CSQueue)queue;
+    int numberOfApplications = csQueue.getNumApplications();
+    return numberOfApplications;
+  }
 
+  @Override
+  protected CapacitySchedulerPlanFollower createPlanFollower() {
     CapacitySchedulerPlanFollower planFollower =
         new CapacitySchedulerPlanFollower();
     planFollower.init(mClock, scheduler, Collections.singletonList(plan));
+    return planFollower;
+  }
 
-    when(mClock.getTime()).thenReturn(0L);
-    planFollower.run();
-
-    CSQueue defQ =
-        scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
-    CSQueue q = scheduler.getQueue(r1.toString());
+  @Override
+  protected void assertReservationQueueExists(ReservationId r) {
+    CSQueue q = cs.getQueue(r.toString());
     assertNotNull(q);
-    // submit an app to r1
-    String user_0 = "test-user";
-    ApplicationId appId = ApplicationId.newInstance(0, 1);
-    ApplicationAttemptId appAttemptId_0 =
-        ApplicationAttemptId.newInstance(appId, 0);
-    AppAddedSchedulerEvent addAppEvent =
-        new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
-    scheduler.handle(addAppEvent);
-    AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
-    scheduler.handle(appAttemptAddedEvent);
-
-    // initial default reservation queue should have no apps
-    Assert.assertEquals(0, defQ.getNumApplications());
-
-    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
-    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
-    Assert.assertEquals(1, q.getNumApplications());
-
-    CSQueue q2 = scheduler.getQueue(r2.toString());
-    assertNull(q2);
-    CSQueue q3 = scheduler.getQueue(r3.toString());
-    assertNull(q3);
-
-    when(mClock.getTime()).thenReturn(3L);
-    planFollower.run();
+  }
 
-    Assert.assertEquals(0, defQ.getNumApplications());
-    q = scheduler.getQueue(r1.toString());
+  @Override
+  protected void assertReservationQueueExists(ReservationId r2,
+      double expectedCapacity, double expectedMaxCapacity) {
+    CSQueue q = cs.getQueue(r2.toString());
     assertNotNull(q);
-    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
-    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
-    Assert.assertEquals(1, q.getNumApplications());
-    q2 = scheduler.getQueue(r2.toString());
-    assertNotNull(q2);
-    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
-    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNull(q3);
-
-    when(mClock.getTime()).thenReturn(10L);
-    planFollower.run();
-
-    q = scheduler.getQueue(r1.toString());
-    if (isMove) {
-      // app should have been moved to default reservation queue
-      Assert.assertEquals(1, defQ.getNumApplications());
-      assertNull(q);
-    } else {
-      // app should be killed
-      Assert.assertEquals(0, defQ.getNumApplications());
-      assertNotNull(q);
-      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
-          new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
-              RMAppAttemptState.KILLED, false);
-      scheduler.handle(appAttemptRemovedEvent);
-    }
-    q2 = scheduler.getQueue(r2.toString());
-    assertNull(q2);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNotNull(q3);
-    Assert.assertEquals(0, q3.getCapacity(), 0.01);
-    Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0);
-
-    when(mClock.getTime()).thenReturn(11L);
-    planFollower.run();
-
-    if (isMove) {
-      // app should have been moved to default reservation queue
-      Assert.assertEquals(1, defQ.getNumApplications());
-    } else {
-      // app should be killed
-      Assert.assertEquals(0, defQ.getNumApplications());
-    }
-    q = scheduler.getQueue(r1.toString());
-    assertNull(q);
-    q2 = scheduler.getQueue(r2.toString());
-    assertNull(q2);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNotNull(q3);
-    Assert.assertEquals(0.1, q3.getCapacity(), 0.01);
-    Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0);
-
-    when(mClock.getTime()).thenReturn(12L);
-    planFollower.run();
-
-    q = scheduler.getQueue(r1.toString());
-    assertNull(q);
-    q2 = scheduler.getQueue(r2.toString());
-    assertNull(q2);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNotNull(q3);
-    Assert.assertEquals(0.2, q3.getCapacity(), 0.01);
-    Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0);
-
-    when(mClock.getTime()).thenReturn(16L);
-    planFollower.run();
+    Assert.assertEquals(expectedCapacity, q.getCapacity(), 0.01);
+    Assert.assertEquals(expectedMaxCapacity, q.getMaximumCapacity(), 1.0);
+  }
 
-    q = scheduler.getQueue(r1.toString());
-    assertNull(q);
-    q2 = scheduler.getQueue(r2.toString());
+  @Override
+  protected void assertReservationQueueDoesNotExist(ReservationId r2) {
+    CSQueue q2 = cs.getQueue(r2.toString());
     assertNull(q2);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNull(q3);
-
-    assertTrue(defQ.getCapacity() > 0.9);
-
   }
 
   public static ApplicationACLsManager mockAppACLsManager() {
@@ -312,8 +196,11 @@ public class TestCapacitySchedulerPlanFollower {
   @After
   public void tearDown() throws Exception {
     if (scheduler != null) {
-      scheduler.stop();
+      cs.stop();
     }
   }
 
+  protected Queue getReservationQueue(String reservationId) {
+    return cs.getQueue(reservationId);
+  }
 }

+ 191 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java

@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Assert;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public abstract class TestSchedulerPlanFollowerBase {
+  final static int GB = 1024;
+  protected Clock mClock = null;
+  protected ResourceScheduler scheduler = null;
+  protected ReservationAgent mAgent;
+  protected Resource minAlloc = Resource.newInstance(GB, 1);
+  protected Resource maxAlloc = Resource.newInstance(GB * 8, 8);
+  protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+  protected Plan plan;
+  private ResourceCalculator res = new DefaultResourceCalculator();
+
+  protected void testPlanFollower(boolean isMove) throws PlanningException,
+      InterruptedException, AccessControlException {
+    // Initialize plan based on move flag
+    plan =
+        new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+            scheduler.getClusterResource(), 1L, res,
+            scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
+            null, isMove);
+
+    // add a few reservations to the plan
+    long ts = System.currentTimeMillis();
+    ReservationId r1 = ReservationId.newInstance(ts, 1);
+    int[] f1 = { 10, 10, 10, 10, 10 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+            "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
+                .generateAllocation(0L, 1L, f1), res, minAlloc)));
+
+    ReservationId r2 = ReservationId.newInstance(ts, 2);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
+            "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
+                .generateAllocation(3L, 1L, f1), res, minAlloc)));
+
+    ReservationId r3 = ReservationId.newInstance(ts, 3);
+    int[] f2 = { 0, 10, 20, 10, 0 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
+            "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
+                .generateAllocation(10L, 1L, f2), res, minAlloc)));
+
+    AbstractSchedulerPlanFollower planFollower = createPlanFollower();
+
+    when(mClock.getTime()).thenReturn(0L);
+    planFollower.run();
+
+    Queue q = getReservationQueue(r1.toString());
+    assertReservationQueueExists(r1);
+    // submit an app to r1
+    String user_0 = "test-user";
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId_0 =
+        ApplicationAttemptId.newInstance(appId, 0);
+    AppAddedSchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
+    scheduler.handle(addAppEvent);
+    AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
+    scheduler.handle(appAttemptAddedEvent);
+
+    // initial default reservation queue should have no apps
+
+    Queue defQ = getDefaultQueue();
+    Assert.assertEquals(0, getNumberOfApplications(defQ));
+
+    assertReservationQueueExists(r1, 0.1, 0.1);
+    Assert.assertEquals(1, getNumberOfApplications(q));
+
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueDoesNotExist(r3);
+
+    when(mClock.getTime()).thenReturn(3L);
+    planFollower.run();
+
+    Assert.assertEquals(0, getNumberOfApplications(defQ));
+    assertReservationQueueExists(r1, 0.1, 0.1);
+    Assert.assertEquals(1, getNumberOfApplications(q));
+    assertReservationQueueExists(r2, 0.1, 0.1);
+    assertReservationQueueDoesNotExist(r3);
+
+    when(mClock.getTime()).thenReturn(10L);
+    planFollower.run();
+
+    q = getReservationQueue(r1.toString());
+    if (isMove) {
+      // app should have been moved to default reservation queue
+      Assert.assertEquals(1, getNumberOfApplications(defQ));
+      assertNull(q);
+    } else {
+      // app should be killed
+      Assert.assertEquals(0, getNumberOfApplications(defQ));
+      assertNotNull(q);
+      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+          new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
+              RMAppAttemptState.KILLED, false);
+      scheduler.handle(appAttemptRemovedEvent);
+    }
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueExists(r3, 0, 1.0);
+
+    when(mClock.getTime()).thenReturn(11L);
+    planFollower.run();
+
+    if (isMove) {
+      // app should have been moved to default reservation queue
+      Assert.assertEquals(1, getNumberOfApplications(defQ));
+    } else {
+      // app should be killed
+      Assert.assertEquals(0, getNumberOfApplications(defQ));
+    }
+    assertReservationQueueDoesNotExist(r1);
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueExists(r3, 0.1, 0.1);
+
+    when(mClock.getTime()).thenReturn(12L);
+    planFollower.run();
+
+    assertReservationQueueDoesNotExist(r1);
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueExists(r3, 0.2, 0.2);
+
+    when(mClock.getTime()).thenReturn(16L);
+    planFollower.run();
+
+    assertReservationQueueDoesNotExist(r1);
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueDoesNotExist(r3);
+
+    verifyCapacity(defQ);
+  }
+
+  protected abstract Queue getReservationQueue(String reservationId);
+
+  protected abstract void verifyCapacity(Queue defQ);
+
+  protected abstract Queue getDefaultQueue();
+
+  protected abstract int getNumberOfApplications(Queue queue);
+
+  protected abstract AbstractSchedulerPlanFollower createPlanFollower();
+
+  protected abstract void assertReservationQueueExists(ReservationId r);
+
+  protected abstract void assertReservationQueueExists(ReservationId r2,
+      double expectedCapacity, double expectedMaxCapacity);
+
+  protected abstract void assertReservationQueueDoesNotExist(ReservationId r2);
+}