Browse Source

YARN-3739. Add reservation system recovery to RM recovery process. Contributed by Subru Krishnan.

Anubhav Dhoot 10 năm trước cách đây
mục cha
commit
2798723a54
21 tập tin đã thay đổi với 556 bổ sung185 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  3. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  4. 73 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
  5. 48 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
  6. 0 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
  7. 0 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
  8. 21 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
  9. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
  10. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
  11. 5 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
  13. 333 27
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java
  14. 0 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
  15. 15 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
  16. 8 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
  17. 10 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
  18. 8 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
  19. 3 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
  20. 9 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
  21. 7 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -238,6 +238,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4262. Allow whitelisted users to run privileged docker containers.
     (Sidharta Seethana via vvasudev)
 
+    YARN-3739. Add reservation system recovery to RM recovery process.
+    (Subru Krishnan via adhoot)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -1363,7 +1363,7 @@ public class ClientRMService extends AbstractService implements
           .format(
               "Reservation {0} is within threshold so attempting to create synchronously.",
               reservationId));
-      reservationSystem.synchronizePlan(planName);
+      reservationSystem.synchronizePlan(planName, true);
       LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
           reservationId));
     }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -1186,6 +1186,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
     // recover AMRMTokenSecretManager
     rmContext.getAMRMTokenSecretManager().recover(state);
 
+    // recover reservations
+    if (reservationSystem != null) {
+      reservationSystem.recover(state);
+    }
     // recover applications
     rmAppManager.recover(state);
 

+ 73 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java

@@ -18,16 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -38,8 +28,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -52,6 +45,17 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * This is the implementation of {@link ReservationSystem} based on the
  * {@link ResourceScheduler}
@@ -94,6 +98,8 @@ public abstract class AbstractReservationSystem extends AbstractService
 
   private PlanFollower planFollower;
 
+  private boolean isRecoveryEnabled = false;
+
   /**
    * Construct the service.
    * 
@@ -149,6 +155,49 @@ public abstract class AbstractReservationSystem extends AbstractService
       Plan plan = initializePlan(planQueueName);
       plans.put(planQueueName, plan);
     }
+    isRecoveryEnabled = conf.getBoolean(
+        YarnConfiguration.RECOVERY_ENABLED,
+        YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+  }
+
+  private void loadPlan(String planName,
+      Map<ReservationId, ReservationAllocationStateProto> reservations)
+          throws PlanningException {
+    Plan plan = plans.get(planName);
+    Resource minAllocation = getMinAllocation();
+    ResourceCalculator rescCalculator = getResourceCalculator();
+    for (Entry<ReservationId, ReservationAllocationStateProto> currentReservation : reservations
+        .entrySet()) {
+      plan.addReservation(ReservationSystemUtil.toInMemoryAllocation(planName,
+          currentReservation.getKey(), currentReservation.getValue(),
+          minAllocation, rescCalculator), true);
+      resQMap.put(currentReservation.getKey(), planName);
+    }
+    LOG.info("Recovered reservations for Plan: {}", planName);
+  }
+
+  @Override
+  public void recover(RMState state) throws Exception {
+    LOG.info("Recovering Reservation system");
+    writeLock.lock();
+    try {
+      Map<String, Map<ReservationId, ReservationAllocationStateProto>> reservationSystemState =
+          state.getReservationState();
+      if (planFollower != null) {
+        for (String plan : plans.keySet()) {
+          // recover reservations if any from state store
+          if (reservationSystemState.containsKey(plan)) {
+            loadPlan(plan, reservationSystemState.get(plan));
+          }
+          synchronizePlan(plan, false);
+        }
+        startPlanFollower(conf.getLong(
+            YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+            YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS));
+      }
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private void initializeNewPlans(Configuration conf) {
@@ -162,7 +211,7 @@ public abstract class AbstractReservationSystem extends AbstractService
           Plan plan = initializePlan(planQueueName);
           plans.put(planQueueName, plan);
         } else {
-          LOG.warn("Plan based on reservation queue {0} already exists.",
+          LOG.warn("Plan based on reservation queue {} already exists.",
               planQueueName);
         }
       }
@@ -236,18 +285,26 @@ public abstract class AbstractReservationSystem extends AbstractService
   }
 
   @Override
-  public void synchronizePlan(String planName) {
+  public void synchronizePlan(String planName, boolean shouldReplan) {
     writeLock.lock();
     try {
       Plan plan = plans.get(planName);
       if (plan != null) {
-        planFollower.synchronizePlan(plan);
+        planFollower.synchronizePlan(plan, shouldReplan);
       }
     } finally {
       writeLock.unlock();
     }
   }
 
+  private void startPlanFollower(long initialDelay) {
+    if (planFollower != null) {
+      scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+      scheduledExecutorService.scheduleWithFixedDelay(planFollower,
+          initialDelay, planStepSize, TimeUnit.MILLISECONDS);
+    }
+  }
+
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     Configuration configuration = new Configuration(conf);
@@ -262,10 +319,8 @@ public abstract class AbstractReservationSystem extends AbstractService
 
   @Override
   public void serviceStart() throws Exception {
-    if (planFollower != null) {
-      scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-      scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
-          planStepSize, TimeUnit.MILLISECONDS);
+    if (!isRecoveryEnabled) {
+      startPlanFollower(planStepSize);
     }
     super.serviceStart();
   }
@@ -350,7 +405,7 @@ public abstract class AbstractReservationSystem extends AbstractService
             minAllocation, maxAllocation, planQueueName,
             getReplanner(planQueuePath), getReservationSchedulerConfiguration()
             .getMoveOnExpiry(planQueuePath), rmContext);
-    LOG.info("Intialized plan {0} based on reservable queue {1}",
+    LOG.info("Intialized plan {} based on reservable queue {}",
         plan.toString(), planQueueName);
     return plan;
   }

+ 48 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java

@@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +43,7 @@ import java.util.Set;
 
 public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
   private static final Logger LOG = LoggerFactory
-      .getLogger(CapacitySchedulerPlanFollower.class);
+      .getLogger(AbstractSchedulerPlanFollower.class);
 
   protected Collection<Plan> plans = new ArrayList<Plan>();
   protected YarnScheduler scheduler;
@@ -59,7 +59,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
   @Override
   public synchronized void run() {
     for (Plan plan : plans) {
-      synchronizePlan(plan);
+      synchronizePlan(plan, true);
     }
   }
 
@@ -70,7 +70,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
   }
 
   @Override
-  public synchronized void synchronizePlan(Plan plan) {
+  public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
      String planQueueName = plan.getQueueName();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
@@ -88,14 +88,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
     Resource clusterResources = scheduler.getClusterResource();
     Resource planResources = getPlanResources(plan, planQueue,
         clusterResources);
-
     Set<ReservationAllocation> currentReservations =
         plan.getReservationsAtTime(now);
     Set<String> curReservationNames = new HashSet<String>();
     Resource reservedResources = Resource.newInstance(0, 0);
     int numRes = getReservedResources(now, currentReservations,
         curReservationNames, reservedResources);
-
     // create the default reservation queue if it doesnt exist
     String defReservationId = getReservationIdFromQueueName(planQueueName) +
         ReservationConstants.DEFAULT_QUEUE_SUFFIX;
@@ -104,14 +102,18 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
     createDefaultReservationQueue(planQueueName, planQueue,
         defReservationId);
     curReservationNames.add(defReservationId);
-
     // if the resources dedicated to this plan has shrunk invoke replanner
-    if (arePlanResourcesLessThanReservations(clusterResources, planResources,
-        reservedResources)) {
-      try {
-        plan.getReplanner().plan(plan, null);
-      } catch (PlanningException e) {
-        LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+    boolean shouldResize = false;
+    if (arePlanResourcesLessThanReservations(plan.getResourceCalculator(),
+        clusterResources, planResources, reservedResources)) {
+      if (shouldReplan) {
+        try {
+          plan.getReplanner().plan(plan, null);
+        } catch (PlanningException e) {
+          LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+        }
+      } else {
+        shouldResize = true;
       }
     }
     // identify the reservations that have expired and new reservations that
@@ -133,7 +135,6 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
     // garbage collect expired reservations
     cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired,
         defReservationQueue);
-
     // Add new reservations and update existing ones
     float totalAssignedCapacity = 0f;
     if (currentReservations != null) {
@@ -146,9 +147,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
             planQueueName, e);
       }
       // sort allocations from the one giving up the most resources, to the
-      // one asking for the most
-      // avoid order-of-operation errors that temporarily violate 100%
-      // capacity bound
+      // one asking for the most avoid order-of-operation errors that
+      // temporarily violate 100% capacity bound
       List<ReservationAllocation> sortedAllocations =
           sortByDelta(
               new ArrayList<ReservationAllocation>(currentReservations), now,
@@ -162,10 +162,15 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
         float targetCapacity = 0f;
         if (planResources.getMemory() > 0
             && planResources.getVirtualCores() > 0) {
+          if (shouldResize) {
+            capToAssign =
+                calculateReservationToPlanProportion(
+                    plan.getResourceCalculator(), planResources,
+                    reservedResources, capToAssign);
+          }
           targetCapacity =
-              calculateReservationToPlanRatio(clusterResources,
-                  planResources,
-                  capToAssign);
+              calculateReservationToPlanRatio(plan.getResourceCalculator(),
+                  clusterResources, planResources, capToAssign);
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug(
@@ -211,7 +216,6 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
     }
     LOG.info("Finished iteration of plan follower edit policy for plan: "
         + planQueueName);
-
     // Extension: update plan with app states,
     // useful to support smart replanning
   }
@@ -323,19 +327,35 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
    */
   protected abstract Queue getPlanQueue(String planQueueName);
 
+  /**
+   * Resizes reservations based on currently available resources
+   */
+  private Resource calculateReservationToPlanProportion(
+      ResourceCalculator rescCalculator, Resource availablePlanResources,
+      Resource totalReservationResources, Resource reservationResources) {
+    return Resources.multiply(availablePlanResources, Resources.ratio(
+        rescCalculator, reservationResources, totalReservationResources));
+  }
+
   /**
    * Calculates ratio of reservationResources to planResources
    */
-  protected abstract float calculateReservationToPlanRatio(
-      Resource clusterResources, Resource planResources,
-      Resource reservationResources);
+  private float calculateReservationToPlanRatio(
+      ResourceCalculator rescCalculator, Resource clusterResources,
+      Resource planResources, Resource reservationResources) {
+    return Resources.divide(rescCalculator, clusterResources,
+        reservationResources, planResources);
+  }
 
   /**
    * Check if plan resources are less than expected reservation resources
    */
-  protected abstract boolean arePlanResourcesLessThanReservations(
-      Resource clusterResources, Resource planResources,
-      Resource reservedResources);
+  private boolean arePlanResourcesLessThanReservations(
+      ResourceCalculator rescCalculator, Resource clusterResources,
+      Resource planResources, Resource reservedResources) {
+    return Resources.greaterThan(rescCalculator, clusterResources,
+        reservedResources, planResources);
+  }
 
   /**
    * Get a list of reservation queues for this planQueue
@@ -363,7 +383,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
       Plan plan, Queue queue, Resource clusterResources);
 
   /**
-   * Get reservation queue resources if it exists otherwise return null
+   * Get reservation queue resources if it exists otherwise return null.
    */
   protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
       ReservationId reservationId);

+ 0 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java

@@ -80,22 +80,6 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower
     return queue;
   }
 
-  @Override
-  protected float calculateReservationToPlanRatio(
-      Resource clusterResources, Resource planResources,
-      Resource reservationResources) {
-    return Resources.divide(cs.getResourceCalculator(),
-        clusterResources, reservationResources, planResources);
-  }
-
-  @Override
-  protected boolean arePlanResourcesLessThanReservations(
-      Resource clusterResources, Resource planResources,
-      Resource reservedResources) {
-    return Resources.greaterThan(cs.getResourceCalculator(),
-        clusterResources, reservedResources, planResources);
-  }
-
   @Override
   protected List<? extends Queue> getChildReservationQueues(Queue queue) {
     PlanQueue planQueue = (PlanQueue)queue;

+ 0 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java

@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,20 +58,6 @@ public class FairSchedulerPlanFollower extends AbstractSchedulerPlanFollower {
     return planQueue;
   }
 
-  @Override
-  protected float calculateReservationToPlanRatio(Resource clusterResources,
-      Resource planResources, Resource capToAssign) {
-    return Resources.divide(fs.getResourceCalculator(),
-        clusterResources, capToAssign, planResources);
-  }
-
-  @Override
-  protected boolean arePlanResourcesLessThanReservations(Resource
-      clusterResources, Resource planResources, Resource reservedResources) {
-    return Resources.greaterThan(fs.getResourceCalculator(),
-        clusterResources, reservedResources, planResources);
-  }
-
   @Override
   protected List<? extends Queue> getChildReservationQueues(Queue queue) {
     FSQueue planQueue = (FSQueue)queue;

+ 21 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java

@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
@@ -54,7 +55,7 @@ public class InMemoryPlan implements Plan {
   private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
 
   private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
-  private final RMContext rmContext;
+  private final RMStateStore rmStateStore;
 
   private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
       new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
@@ -112,7 +113,7 @@ public class InMemoryPlan implements Plan {
     this.replanner = replanner;
     this.getMoveOnExpiry = getMoveOnExpiry;
     this.clock = clock;
-    this.rmContext = rmContext;
+    this.rmStateStore = rmContext.getStateStore();
   }
 
   @Override
@@ -174,8 +175,8 @@ public class InMemoryPlan implements Plan {
   }
 
   @Override
-  public boolean addReservation(ReservationAllocation reservation)
-      throws PlanningException {
+  public boolean addReservation(ReservationAllocation reservation,
+      boolean isRecovering) throws PlanningException {
     // Verify the allocation is memory based otherwise it is not supported
     InMemoryReservationAllocation inMemReservation =
         (InMemoryReservationAllocation) reservation;
@@ -198,9 +199,16 @@ public class InMemoryPlan implements Plan {
       }
       // Validate if we can accept this reservation, throws exception if
       // validation fails
-      policy.validate(this, inMemReservation);
-      // we record here the time in which the allocation has been accepted
-      reservation.setAcceptanceTimestamp(clock.getTime());
+      if (!isRecovering) {
+        policy.validate(this, inMemReservation);
+        // we record here the time in which the allocation has been accepted
+        reservation.setAcceptanceTimestamp(clock.getTime());
+        if (rmStateStore != null) {
+          rmStateStore.storeNewReservation(
+              ReservationSystemUtil.buildStateProto(inMemReservation),
+              getQueueName(), inMemReservation.getReservationId().toString());
+        }
+      }
       ReservationInterval searchInterval =
           new ReservationInterval(inMemReservation.getStartTime(),
               inMemReservation.getEndTime());
@@ -217,9 +225,6 @@ public class InMemoryPlan implements Plan {
       currentReservations.put(searchInterval, reservations);
       reservationTable.put(inMemReservation.getReservationId(),
           inMemReservation);
-      rmContext.getStateStore().storeNewReservation(
-          ReservationSystemUtil.buildStateProto(inMemReservation),
-          getQueueName(), inMemReservation.getReservationId().toString());
       incrementAllocation(inMemReservation);
       LOG.info("Sucessfully added reservation: {} to plan.",
           inMemReservation.getReservationId());
@@ -253,7 +258,7 @@ public class InMemoryPlan implements Plan {
         return result;
       }
       try {
-        result = addReservation(reservation);
+        result = addReservation(reservation, false);
       } catch (PlanningException e) {
         LOG.error("Unable to update reservation: {} from plan due to {}.",
             reservation.getReservationId(), e.getMessage());
@@ -264,7 +269,7 @@ public class InMemoryPlan implements Plan {
         return result;
       } else {
         // rollback delete
-        addReservation(currReservation);
+        addReservation(currReservation, false);
         LOG.info("Rollbacked update reservation: {} from plan.",
             reservation.getReservationId());
         return result;
@@ -282,6 +287,10 @@ public class InMemoryPlan implements Plan {
     Set<InMemoryReservationAllocation> reservations =
         currentReservations.get(searchInterval);
     if (reservations != null) {
+      if (rmStateStore != null) {
+        rmStateStore.removeReservation(getQueueName(),
+            reservation.getReservationId().toString());
+      }
       if (!reservations.remove(reservation)) {
         LOG.error("Unable to remove reservation: {} from plan.",
             reservation.getReservationId());
@@ -298,8 +307,6 @@ public class InMemoryPlan implements Plan {
       throw new IllegalArgumentException(errMsg);
     }
     reservationTable.remove(reservation.getReservationId());
-    rmContext.getStateStore().removeReservation(
-        getQueueName(), reservation.getReservationId().toString());
     decrementAllocation(reservation);
     LOG.info("Sucessfully deleted reservation: {} in plan.",
         reservation.getReservationId());

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java

@@ -32,10 +32,12 @@ public interface PlanEdit extends PlanContext, PlanView {
    * 
    * @param reservation the {@link ReservationAllocation} to be added to the
    *          plan
+   * @param isRecovering flag to indicate if reservation is being added as part
+   *          of failover or not
    * @return true if addition is successful, false otherwise
    */
-  public boolean addReservation(ReservationAllocation reservation)
-      throws PlanningException;
+  public boolean addReservation(ReservationAllocation reservation,
+      boolean isRecovering) throws PlanningException;
 
   /**
    * Updates an existing {@link ReservationAllocation} in the plan. This is

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java

@@ -71,8 +71,10 @@ public interface PlanFollower extends Runnable {
    * start time is imminent.
    * 
    * @param plan the Plan to synchronize
+   * @param shouldReplan replan on reduction of plan capacity if true or
+   *          proportionally scale down reservations if false
    */
-  public void synchronizePlan(Plan plan);
+  public void synchronizePlan(Plan plan, boolean shouldReplan);
 
   /**
    * Setter for the list of plans.

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 
@@ -40,7 +41,7 @@ import java.util.Map;
  */
 @LimitedPrivate("yarn")
 @Unstable
-public interface ReservationSystem {
+public interface ReservationSystem extends Recoverable {
 
   /**
    * Set RMContext for {@link ReservationSystem}. This method should be called
@@ -82,8 +83,10 @@ public interface ReservationSystem {
    * the {@link ResourceScheduler}
    * 
    * @param planName the name of the {@link Plan} to be synchronized
+   * @param shouldReplan replan on reduction of plan capacity if true or
+   *          proportionally scale down reservations if false
    */
-  void synchronizePlan(String planName);
+  void synchronizePlan(String planName, boolean shouldReplan);
 
   /**
    * Return the time step (ms) at which the {@link PlanFollower} is invoked

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java

@@ -94,7 +94,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
     if (oldReservation != null) {
       return plan.updateReservation(capReservation);
     } else {
-      return plan.addReservation(capReservation);
+      return plan.addReservation(capReservation, false);
     }
 
   }

+ 333 - 27
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java

@@ -18,15 +18,20 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -38,7 +43,7 @@ import org.junit.Test;
 
 import java.util.Map;
 
-public class TestReservationSystemWithRMHA extends RMHATestBase{
+public class TestReservationSystemWithRMHA extends RMHATestBase {
 
   @Override
   public void setup() throws Exception {
@@ -56,7 +61,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
   public void testSubmitReservationAndCheckAfterFailover() throws Exception {
     startRMs();
 
-    addNodeCapacityToPlan();
+    addNodeCapacityToPlan(rm1, 102400, 100);
 
     ClientRMService clientService = rm1.getClientRMService();
 
@@ -72,8 +77,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
     ReservationId reservationID = response.getReservationId();
     Assert.assertNotNull(reservationID);
     LOG.info("Submit reservation response: " + reservationID);
-    ReservationDefinition reservationDefinition = request
-        .getReservationDefinition();
 
     // Do the failover
     explicitFailover();
@@ -87,12 +90,11 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
     Assert.assertNotNull(reservationStateMap.get(reservationID));
   }
 
-
   @Test
   public void testUpdateReservationAndCheckAfterFailover() throws Exception {
     startRMs();
 
-    addNodeCapacityToPlan();
+    addNodeCapacityToPlan(rm1, 102400, 100);
 
     ClientRMService clientService = rm1.getClientRMService();
 
@@ -108,17 +110,15 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
     ReservationId reservationID = response.getReservationId();
     Assert.assertNotNull(reservationID);
     LOG.info("Submit reservation response: " + reservationID);
-    ReservationDefinition reservationDefinition = request
-        .getReservationDefinition();
-
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
 
     // Change any field
 
     long newDeadline = reservationDefinition.getDeadline() + 100;
     reservationDefinition.setDeadline(newDeadline);
-    ReservationUpdateRequest updateRequest =
-        ReservationUpdateRequest.newInstance(
-          reservationDefinition, reservationID);
+    ReservationUpdateRequest updateRequest = ReservationUpdateRequest
+        .newInstance(reservationDefinition, reservationID);
     rm1.updateReservationState(updateRequest);
 
     // Do the failover
@@ -140,7 +140,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
   public void testDeleteReservationAndCheckAfterFailover() throws Exception {
     startRMs();
 
-    addNodeCapacityToPlan();
+    addNodeCapacityToPlan(rm1, 102400, 100);
 
     ClientRMService clientService = rm1.getClientRMService();
 
@@ -156,7 +156,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
     ReservationId reservationID = response.getReservationId();
     Assert.assertNotNull(reservationID);
 
-
     // Delete the reservation
     ReservationDeleteRequest deleteRequest =
         ReservationDeleteRequest.newInstance(reservationID);
@@ -168,32 +167,31 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
     rm2.registerNode("127.0.0.1:1", 102400, 100);
 
     RMState state = rm2.getRMContext().getStateStore().loadState();
-    Assert.assertNull(state.getReservationState().get(
-        ReservationSystemTestUtil.reservationQ));
+    Assert.assertNull(state.getReservationState()
+        .get(ReservationSystemTestUtil.reservationQ));
   }
 
-  private void addNodeCapacityToPlan() {
+  private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) {
     try {
-      rm1.registerNode("127.0.0.1:1", 102400, 100);
+      rm.registerNode("127.0.0.1:1", memory, vCores);
       int attempts = 10;
       do {
         DrainDispatcher dispatcher =
             (DrainDispatcher) rm1.getRMContext().getDispatcher();
         dispatcher.await();
-        rm1.getRMContext().getReservationSystem().synchronizePlan(
-            ReservationSystemTestUtil.reservationQ);
-        if (rm1.getRMContext().getReservationSystem().getPlan
-            (ReservationSystemTestUtil.reservationQ).getTotalCapacity()
+        rm.getRMContext().getReservationSystem()
+            .synchronizePlan(ReservationSystemTestUtil.reservationQ, false);
+        if (rm.getRMContext().getReservationSystem()
+            .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
             .getMemory() > 0) {
           break;
         }
         LOG.info("Waiting for node capacity to be added to plan");
         Thread.sleep(100);
-      }
-      while (attempts-- > 0);
+      } while (attempts-- > 0);
       if (attempts <= 0) {
-        Assert.fail("Exhausted attempts in checking if node capacity was " +
-            "added to the plan");
+        Assert.fail("Exhausted attempts in checking if node capacity was "
+            + "added to the plan");
       }
 
     } catch (Exception e) {
@@ -205,8 +203,316 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
-    long deadline = (long) (arrival + 1.05 * duration);
+    long deadline = (long) (arrival + duration + 1500);
     return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
         deadline, duration);
   }
+
+  private void validateReservation(Plan plan, ReservationId resId,
+      ReservationDefinition rDef) {
+    ReservationAllocation reservation = plan.getReservationById(resId);
+    Assert.assertNotNull(reservation);
+    Assert.assertEquals(rDef.getDeadline(),
+        reservation.getReservationDefinition().getDeadline());
+  }
+
+  @Test
+  public void testSubmitReservationFailoverAndDelete() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 102400, 100);
+
+    // check if reservation exists after failover
+    Plan plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // delete the reservation
+    ReservationDeleteRequest deleteRequest =
+        ReservationDeleteRequest.newInstance(reservationID);
+    ReservationDeleteResponse deleteResponse = null;
+    clientService = rm2.getClientRMService();
+    try {
+      deleteResponse = clientService.deleteReservation(deleteRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(deleteResponse);
+    Assert.assertNull(plan.getReservationById(reservationID));
+  }
+
+  @Test
+  public void testFailoverAndSubmitReservation() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 102400, 100);
+
+    // create a reservation
+    ClientRMService clientService = rm2.getClientRMService();
+    ReservationSubmissionRequest request = createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+
+    // check if reservation is submitted successfully
+    Plan plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+  }
+
+  @Test
+  public void testSubmitReservationFailoverAndUpdate() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 102400, 100);
+
+    // check if reservation exists after failover
+    Plan plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // update the reservation
+    long newDeadline = reservationDefinition.getDeadline() + 100;
+    reservationDefinition.setDeadline(newDeadline);
+    ReservationUpdateRequest updateRequest = ReservationUpdateRequest
+        .newInstance(reservationDefinition, reservationID);
+    ReservationUpdateResponse updateResponse = null;
+    clientService = rm2.getClientRMService();
+    try {
+      updateResponse = clientService.updateReservation(updateRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(updateResponse);
+    validateReservation(plan, reservationID, reservationDefinition);
+  }
+
+  @Test
+  public void testSubmitUpdateReservationFailoverAndDelete() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+
+    // check if reservation is submitted successfully
+    Plan plan = rm1.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // update the reservation
+    long newDeadline = reservationDefinition.getDeadline() + 100;
+    reservationDefinition.setDeadline(newDeadline);
+    ReservationUpdateRequest updateRequest = ReservationUpdateRequest
+        .newInstance(reservationDefinition, reservationID);
+    ReservationUpdateResponse updateResponse = null;
+    try {
+      updateResponse = clientService.updateReservation(updateRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(updateResponse);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 102400, 100);
+
+    // check if reservation exists after failover
+    plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // delete the reservation
+    ReservationDeleteRequest deleteRequest =
+        ReservationDeleteRequest.newInstance(reservationID);
+    ReservationDeleteResponse deleteResponse = null;
+    clientService = rm2.getClientRMService();
+    try {
+      deleteResponse = clientService.deleteReservation(deleteRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(deleteResponse);
+    Assert.assertNull(plan.getReservationById(reservationID));
+  }
+
+  @Test
+  public void testReservationResizeAfterFailover() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create 3 reservations
+    ReservationSubmissionRequest request = createReservationSubmissionRequest();
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId resID1 = response.getReservationId();
+    Assert.assertNotNull(resID1);
+    LOG.info("Submit reservation response: " + resID1);
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId resID2 = response.getReservationId();
+    Assert.assertNotNull(resID2);
+    LOG.info("Submit reservation response: " + resID2);
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId resID3 = response.getReservationId();
+    Assert.assertNotNull(resID3);
+    LOG.info("Submit reservation response: " + resID3);
+
+    // allow the reservations to become active
+    waitForReservationActivation(rm1, resID1,
+        ReservationSystemTestUtil.reservationQ);
+
+    // validate reservations before failover
+    Plan plan = rm1.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, resID1, reservationDefinition);
+    validateReservation(plan, resID2, reservationDefinition);
+    validateReservation(plan, resID3, reservationDefinition);
+    ResourceScheduler scheduler = rm1.getResourceScheduler();
+    QueueInfo resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
+    Assert.assertEquals(0.05, resQ1.getCapacity(), 0.001f);
+    QueueInfo resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
+    Assert.assertEquals(0.05, resQ2.getCapacity(), 0.001f);
+    QueueInfo resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
+    Assert.assertEquals(0.05, resQ3.getCapacity(), 0.001f);
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 5120, 5);
+
+    // check if reservations exists after failover
+    plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, resID1, reservationDefinition);
+    validateReservation(plan, resID3, reservationDefinition);
+
+    // verify if the reservations have been resized
+    scheduler = rm2.getResourceScheduler();
+    resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
+    Assert.assertEquals(1f / 3f, resQ1.getCapacity(), 0.001f);
+    resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
+    Assert.assertEquals(1f / 3f, resQ2.getCapacity(), 0.001f);
+    resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
+    Assert.assertEquals(1f / 3f, resQ3.getCapacity(), 0.001f);
+  }
+
+  private void waitForReservationActivation(MockRM rm,
+      ReservationId reservationId, String planName) {
+    try {
+      int attempts = 20;
+      do {
+        rm.getRMContext().getReservationSystem().synchronizePlan(planName,
+            false);
+        if (rm.getResourceScheduler()
+            .getQueueInfo(reservationId.toString(), false, false)
+            .getCapacity() > 0f) {
+          break;
+        }
+        LOG.info("Waiting for reservation to be active");
+        Thread.sleep(100);
+      } while (attempts-- > 0);
+      if (attempts <= 0) {
+        Assert
+            .fail("Exceeded attempts in waiting for reservation to be active");
+      }
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
 }

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java

@@ -215,7 +215,6 @@ public class ReservationSystemTestUtil {
     return context;
   }
 
-  @SuppressWarnings("unchecked")
   public CapacityScheduler mockCapacityScheduler(int numContainers)
       throws IOException {
     // stolen from TestCapacityScheduler

+ 15 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java

@@ -75,10 +75,11 @@ public class TestCapacityOverTimePolicy {
     maxAlloc = Resource.newInstance(1024 * 8, 8);
 
     mAgent = mock(ReservationAgent.class);
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
     QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
-    String reservationQ = testUtil.getFullReservationQueueName();
-    Resource clusterResource = testUtil.calculateClusterResource(totCont);
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
+    Resource clusterResource =
+        ReservationSystemTestUtil.calculateClusterResource(totCont);
     ReservationSchedulerConfiguration conf =
         ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
             instConstraint, avgConstraint);
@@ -113,7 +114,7 @@ public class TestCapacityOverTimePolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
   }
 
   @Test
@@ -130,7 +131,7 @@ public class TestCapacityOverTimePolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
   }
 
   @Test
@@ -146,7 +147,7 @@ public class TestCapacityOverTimePolicy {
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
     }
   }
 
@@ -163,7 +164,7 @@ public class TestCapacityOverTimePolicy {
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
     }
   }
 
@@ -179,7 +180,7 @@ public class TestCapacityOverTimePolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
     Assert.fail("should not have accepted this");
   }
 
@@ -195,20 +196,20 @@ public class TestCapacityOverTimePolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
     try {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
       Assert.fail();
     } catch (PlanningQuotaException p) {
       // expected
@@ -232,7 +233,7 @@ public class TestCapacityOverTimePolicy {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + win, req, res, minAlloc)));
+            "dedicated", initTime, initTime + win, req, res, minAlloc), false));
   }
 
   @Test
@@ -251,13 +252,13 @@ public class TestCapacityOverTimePolicy {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + win, req, res, minAlloc)));
+            "dedicated", initTime, initTime + win, req, res, minAlloc), false));
 
     try {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
               ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-              "dedicated", initTime, initTime + win, req, res, minAlloc)));
+              "dedicated", initTime, initTime + win, req, res, minAlloc), false));
 
       Assert.fail("should not have accepted this");
     } catch (PlanningQuotaException e) {

+ 8 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java

@@ -113,7 +113,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -147,7 +147,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -175,7 +175,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -189,7 +189,7 @@ public class TestInMemoryPlan {
 
     // Try to add it again
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
       Assert.fail("Add should fail as it already exists");
     } catch (IllegalArgumentException e) {
       Assert.assertTrue(e.getMessage().endsWith("already exists"));
@@ -221,7 +221,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -316,7 +316,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -388,7 +388,7 @@ public class TestInMemoryPlan {
             minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID1));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -419,7 +419,7 @@ public class TestInMemoryPlan {
             minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID2));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }

+ 10 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java

@@ -61,10 +61,11 @@ public class TestNoOverCommitPolicy {
     maxAlloc = Resource.newInstance(1024 * 8, 8);
 
     mAgent = mock(ReservationAgent.class);
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
-    String reservationQ = testUtil.getFullReservationQueueName();
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
     QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
-    Resource clusterResource = testUtil.calculateClusterResource(totCont);
+    Resource clusterResource =
+        ReservationSystemTestUtil.calculateClusterResource(totCont);
     ReservationSchedulerConfiguration conf = mock
         (ReservationSchedulerConfiguration.class);
     NoOverCommitPolicy policy = new NoOverCommitPolicy();
@@ -97,7 +98,7 @@ public class TestNoOverCommitPolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
   }
 
   @Test
@@ -113,7 +114,7 @@ public class TestNoOverCommitPolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
   }
 
   @Test(expected = ResourceOverCommitException.class)
@@ -123,7 +124,7 @@ public class TestNoOverCommitPolicy {
     plan.addReservation(new InMemoryReservationAllocation(
         ReservationSystemTestUtil.getNewReservationId(), null, "u1",
         "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
-            .generateAllocation(initTime, step, f), res, minAlloc));
+            .generateAllocation(initTime, step, f), res, minAlloc), false);
   }
 
   @Test(expected = MismatchedUserException.class)
@@ -137,7 +138,7 @@ public class TestNoOverCommitPolicy {
 
     plan.addReservation(new InMemoryReservationAllocation(rid, rDef, "u1",
         "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
-            .generateAllocation(initTime, step, f), res, minAlloc));
+            .generateAllocation(initTime, step, f), res, minAlloc), false);
 
     // trying to update a reservation with a mismatching user
     plan.updateReservation(new InMemoryReservationAllocation(rid, rDef, "u2",
@@ -158,7 +159,7 @@ public class TestNoOverCommitPolicy {
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
     }
   }
 
@@ -175,7 +176,7 @@ public class TestNoOverCommitPolicy {
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
     }
   }
 }

+ 8 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java

@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -38,12 +43,6 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.Assert;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public abstract class TestSchedulerPlanFollowerBase {
   final static int GB = 1024;
   protected Clock mClock = null;
@@ -75,20 +74,20 @@ public abstract class TestSchedulerPlanFollowerBase {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
             "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
-                .generateAllocation(0L, 1L, f1), res, minAlloc)));
+                .generateAllocation(0L, 1L, f1), res, minAlloc), false));
 
     ReservationId r2 = ReservationId.newInstance(ts, 2);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u3",
             "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
-                .generateAllocation(3L, 1L, f1), res, minAlloc)));
+                .generateAllocation(3L, 1L, f1), res, minAlloc), false));
 
     ReservationId r3 = ReservationId.newInstance(ts, 3);
     int[] f2 = { 0, 10, 20, 10, 0 };
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u4",
             "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
-                .generateAllocation(10L, 1L, f2), res, minAlloc)));
+                .generateAllocation(10L, 1L, f2), res, minAlloc), false));
 
     AbstractSchedulerPlanFollower planFollower = createPlanFollower();
 

+ 3 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java

@@ -711,9 +711,8 @@ public class TestAlignedPlanner {
 
     Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores);
 
-    // Set configuration
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
-    String reservationQ = testUtil.getFullReservationQueueName();
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
     float instConstraint = 100;
     float avgConstraint = 100;
 
@@ -792,7 +791,7 @@ public class TestAlignedPlanner {
             ReservationSystemTestUtil.getNewReservationId(), rDef,
             "user_fixed", "dedicated", start, start + f.length * step,
             ReservationSystemTestUtil.generateAllocation(start, step, f), res,
-            minAlloc)));
+            minAlloc), false));
 
   }
 

+ 9 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java

@@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -76,8 +75,8 @@ public class TestGreedyReservationAgent {
     long timeWindow = 1000000L;
     Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
     step = 1000L;
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
-    String reservationQ = testUtil.getFullReservationQueueName();
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
 
     float instConstraint = 100;
     float avgConstraint = 100;
@@ -151,7 +150,7 @@ public class TestGreedyReservationAgent {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 30 * step, 30 * step + f.length * step,
             ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
 
     // create a chain of 4 RR, mixing gang and non-gang
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
@@ -208,7 +207,7 @@ public class TestGreedyReservationAgent {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 30 * step, 30 * step + f.length * step,
             ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
 
     // create a chain of 4 RR, mixing gang and non-gang
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
@@ -529,7 +528,7 @@ public class TestGreedyReservationAgent {
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
-                .generateAllocation(0, step, f), res, minAlloc)));
+                .generateAllocation(0, step, f), res, minAlloc), false));
 
     int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
     Map<ReservationInterval, Resource> alloc =
@@ -537,7 +536,8 @@ public class TestGreedyReservationAgent {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
+            "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc),
+        false));
 
     System.out.println("--------BEFORE AGENT----------");
     System.out.println(plan.toString());
@@ -563,7 +563,8 @@ public class TestGreedyReservationAgent {
     step = 1000L;
     ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
     CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
-    String reservationQ = testUtil.getFullReservationQueueName();
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
     float instConstraint = 100;
     float avgConstraint = 100;
     ReservationSchedulerConfiguration conf =

+ 7 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java

@@ -93,44 +93,44 @@ public class TestSimpleCapacityReplanner {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(1L);
     ReservationId r2 = ReservationId.newInstance(ts, 2);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u4",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(2L);
     ReservationId r3 = ReservationId.newInstance(ts, 3);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u5",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(3L);
     ReservationId r4 = ReservationId.newInstance(ts, 4);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r4, rDef, "u6",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(4L);
     ReservationId r5 = ReservationId.newInstance(ts, 5);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r5, rDef, "u7",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
 
     int[] f6 = { 50, 50, 50, 50, 50 };
     ReservationId r6 = ReservationId.newInstance(ts, 6);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r6, rDef, "u3",
             "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(6L);
     ReservationId r7 = ReservationId.newInstance(ts, 7);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r7, rDef, "u4",
             "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
-            minAlloc)));
+            minAlloc), false));
 
     // remove some of the resources (requires replanning)
     plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));