Browse Source

YARN-3985. Make ReservationSystem persist state using RMStateStore reservation APIs. (adhoot via asuresh)

Arun Suresh 10 năm trước cách đây
mục cha
commit
506d1b1dbc
15 tập tin đã thay đổi với 407 bổ sung79 xóa
  1. 2 0
      hadoop-yarn-project/CHANGES.txt
  2. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
  3. 14 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
  4. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  5. 14 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
  6. 2 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
  7. 212 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java
  8. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
  9. 39 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
  10. 12 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
  11. 26 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
  12. 11 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
  13. 7 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
  14. 19 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
  15. 15 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java

+ 2 - 0
hadoop-yarn-project/CHANGES.txt

@@ -522,6 +522,8 @@ Release 2.8.0 - UNRELEASED
     YARN-4267. Add additional logging to container launch implementations in
     container-executor. (Sidharta Seethana via vvasudev)
 
+    YARN-3985. Make ReservationSystem persist state using RMStateStore
+    reservation APIs. (adhoot via asuresh)
 
   OPTIMIZATIONS
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java

@@ -349,7 +349,7 @@ public abstract class AbstractReservationSystem extends AbstractService
             getAgent(planQueuePath), totCap, planStepSize, rescCalc,
             minAllocation, maxAllocation, planQueueName,
             getReplanner(planQueuePath), getReservationSchedulerConfiguration()
-            .getMoveOnExpiry(planQueuePath));
+            .getMoveOnExpiry(planQueuePath), rmContext);
     LOG.info("Intialized plan {0} based on reservable queue {1}",
         plan.toString(), planQueueName);
     return plan;

+ 14 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java

@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
@@ -53,6 +54,7 @@ public class InMemoryPlan implements Plan {
   private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
 
   private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+  private final RMContext rmContext;
 
   private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
       new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
@@ -85,15 +87,18 @@ public class InMemoryPlan implements Plan {
   public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
       ReservationAgent agent, Resource totalCapacity, long step,
       ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
-      String queueName, Planner replanner, boolean getMoveOnExpiry) {
+      String queueName, Planner replanner, boolean getMoveOnExpiry,
+      RMContext rmContext) {
     this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
-        maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
+        maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext,
+        new UTCClock());
   }
 
   public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
       ReservationAgent agent, Resource totalCapacity, long step,
       ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
-      String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
+      String queueName, Planner replanner, boolean getMoveOnExpiry,
+      RMContext rmContext, Clock clock) {
     this.queueMetrics = queueMetrics;
     this.policy = policy;
     this.agent = agent;
@@ -107,6 +112,7 @@ public class InMemoryPlan implements Plan {
     this.replanner = replanner;
     this.getMoveOnExpiry = getMoveOnExpiry;
     this.clock = clock;
+    this.rmContext = rmContext;
   }
 
   @Override
@@ -211,6 +217,9 @@ public class InMemoryPlan implements Plan {
       currentReservations.put(searchInterval, reservations);
       reservationTable.put(inMemReservation.getReservationId(),
           inMemReservation);
+      rmContext.getStateStore().storeNewReservation(
+          ReservationSystemUtil.buildStateProto(inMemReservation),
+          getQueueName(), inMemReservation.getReservationId().toString());
       incrementAllocation(inMemReservation);
       LOG.info("Sucessfully added reservation: {} to plan.",
           inMemReservation.getReservationId());
@@ -289,6 +298,8 @@ public class InMemoryPlan implements Plan {
       throw new IllegalArgumentException(errMsg);
     }
     reservationTable.remove(reservation.getReservationId());
+    rmContext.getStateStore().removeReservation(
+        getQueueName(), reservation.getReservationId().toString());
     decrementAllocation(reservation);
     LOG.info("Sucessfully deleted reservation: {} in plan.",
         reservation.getReservationId());

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
@@ -815,6 +816,12 @@ public class MockRM extends ResourceManager {
     return response.getApplicationReport();
   }
 
+  public void updateReservationState(ReservationUpdateRequest request)
+      throws IOException, YarnException {
+    ApplicationClientProtocol client = getClientRMService();
+    client.updateReservation(request);
+  }
+
   // Explicitly reset queue metrics for testing.
   @SuppressWarnings("static-access")
   public void clearQueueMetrics(RMApp app) {

+ 14 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java

@@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -106,8 +108,18 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
   }
 
   protected void startRMs() throws IOException {
-    rm1 = new MockRM(confForRM1, null, false);
-    rm2 = new MockRM(confForRM2, null, false);
+    rm1 = new MockRM(confForRM1, null, false){
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new DrainDispatcher();
+      }
+    };
+    rm2 = new MockRM(confForRM2, null, false){
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new DrainDispatcher();
+      }
+    };
     startRMs(rm1, confForRM1, rm2, confForRM2);
 
   }

+ 2 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -32,7 +32,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -103,8 +102,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -1115,7 +1112,8 @@ public class TestClientRMService {
     long duration = 60000;
     long deadline = (long) (arrival + 1.05 * duration);
     ReservationSubmissionRequest sRequest =
-        createSimpleReservationRequest(4, arrival, deadline, duration);
+        ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
+            deadline, duration);
     ReservationSubmissionResponse sResponse = null;
     try {
       sResponse = clientService.submitReservation(sRequest);
@@ -1167,24 +1165,6 @@ public class TestClientRMService {
     rm = null;
   }
 
-  private ReservationSubmissionRequest createSimpleReservationRequest(
-      int numContainers, long arrival, long deadline, long duration) {
-    // create a request with a single atomic ask
-    ReservationRequest r =
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-            numContainers, 1, duration);
-    ReservationRequests reqs =
-        ReservationRequests.newInstance(Collections.singletonList(r),
-            ReservationRequestInterpreter.R_ALL);
-    ReservationDefinition rDef =
-        ReservationDefinition.newInstance(arrival, deadline, reqs,
-            "testClientRMService#reservation");
-    ReservationSubmissionRequest request =
-        ReservationSubmissionRequest.newInstance(rDef,
-            ReservationSystemTestUtil.reservationQ);
-    return request;
-  }
-  
   @Test
   public void testGetNodeLabels() throws Exception {
     MockRM rm = new MockRM() {

+ 212 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java

@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TestReservationSystemWithRMHA extends RMHATestBase{
+
+  @Override
+  public void setup() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    ReservationSystemTestUtil.setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+    configuration = conf;
+
+    super.setup();
+  }
+
+  @Test
+  public void testSubmitReservationAndCheckAfterFailover() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan();
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition = request
+        .getReservationDefinition();
+
+    // Do the failover
+    explicitFailover();
+
+    rm2.registerNode("127.0.0.1:1", 102400, 100);
+
+    RMState state = rm2.getRMContext().getStateStore().loadState();
+    Map<ReservationId, ReservationAllocationStateProto> reservationStateMap =
+        state.getReservationState().get(ReservationSystemTestUtil.reservationQ);
+    Assert.assertNotNull(reservationStateMap);
+    Assert.assertNotNull(reservationStateMap.get(reservationID));
+  }
+
+
+  @Test
+  public void testUpdateReservationAndCheckAfterFailover() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan();
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition = request
+        .getReservationDefinition();
+
+
+    // Change any field
+
+    long newDeadline = reservationDefinition.getDeadline() + 100;
+    reservationDefinition.setDeadline(newDeadline);
+    ReservationUpdateRequest updateRequest =
+        ReservationUpdateRequest.newInstance(
+          reservationDefinition, reservationID);
+    rm1.updateReservationState(updateRequest);
+
+    // Do the failover
+    explicitFailover();
+
+    rm2.registerNode("127.0.0.1:1", 102400, 100);
+
+    RMState state = rm2.getRMContext().getStateStore().loadState();
+    Map<ReservationId, ReservationAllocationStateProto> reservationStateMap =
+        state.getReservationState().get(ReservationSystemTestUtil.reservationQ);
+    Assert.assertNotNull(reservationStateMap);
+    ReservationAllocationStateProto reservationState =
+        reservationStateMap.get(reservationID);
+    Assert.assertEquals(newDeadline,
+        reservationState.getReservationDefinition().getDeadline());
+  }
+
+  @Test
+  public void testDeleteReservationAndCheckAfterFailover() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan();
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+
+
+    // Delete the reservation
+    ReservationDeleteRequest deleteRequest =
+        ReservationDeleteRequest.newInstance(reservationID);
+    clientService.deleteReservation(deleteRequest);
+
+    // Do the failover
+    explicitFailover();
+
+    rm2.registerNode("127.0.0.1:1", 102400, 100);
+
+    RMState state = rm2.getRMContext().getStateStore().loadState();
+    Assert.assertNull(state.getReservationState().get(
+        ReservationSystemTestUtil.reservationQ));
+  }
+
+  private void addNodeCapacityToPlan() {
+    try {
+      rm1.registerNode("127.0.0.1:1", 102400, 100);
+      int attempts = 10;
+      do {
+        DrainDispatcher dispatcher =
+            (DrainDispatcher) rm1.getRMContext().getDispatcher();
+        dispatcher.await();
+        rm1.getRMContext().getReservationSystem().synchronizePlan(
+            ReservationSystemTestUtil.reservationQ);
+        if (rm1.getRMContext().getReservationSystem().getPlan
+            (ReservationSystemTestUtil.reservationQ).getTotalCapacity()
+            .getMemory() > 0) {
+          break;
+        }
+        LOG.info("Waiting for node capacity to be added to plan");
+        Thread.sleep(100);
+      }
+      while (attempts-- > 0);
+      if (attempts <= 0) {
+        Assert.fail("Exhausted attempts in checking if node capacity was " +
+            "added to the plan");
+      }
+
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  private ReservationSubmissionRequest createReservationSubmissionRequest() {
+    Clock clock = new UTCClock();
+    long arrival = clock.getTime();
+    long duration = 60000;
+    long deadline = (long) (arrival + 1.05 * duration);
+    return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
+        deadline, duration);
+  }
+}

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java

@@ -32,6 +32,7 @@ import java.util.Random;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
@@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -189,6 +191,30 @@ public class ReservationSystemTestUtil {
     return rDef;
   }
 
+  public static ReservationSubmissionRequest createSimpleReservationRequest(
+      int numContainers, long arrival, long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+            numContainers, 1, duration);
+    ReservationRequests reqs =
+        ReservationRequests.newInstance(Collections.singletonList(r),
+            ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef =
+        ReservationDefinition.newInstance(arrival, deadline, reqs,
+            "testClientRMService#reservation");
+    ReservationSubmissionRequest request =
+        ReservationSubmissionRequest.newInstance(rDef,
+            reservationQ);
+    return request;
+  }
+
+  public static RMContext createMockRMContext() {
+    RMContext context = mock(RMContext.class);
+    when(context.getStateStore()).thenReturn(new MemoryRMStateStore());
+    return context;
+  }
+
   @SuppressWarnings("unchecked")
   public CapacityScheduler mockCapacityScheduler(int numContainers)
       throws IOException {

+ 39 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java

@@ -24,8 +24,10 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
@@ -82,11 +84,12 @@ public class TestCapacityOverTimePolicy {
             instConstraint, avgConstraint);
     CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
     policy.init(reservationQ, conf);
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
 
     plan =
         new InMemoryPlan(rootQueueMetrics, policy, mAgent,
             clusterResource, step, res, minAlloc, maxAlloc,
-            "dedicated", null, true);
+            "dedicated", null, true, context);
   }
 
   public int[] generateData(int length, int val) {
@@ -101,9 +104,13 @@ public class TestCapacityOverTimePolicy {
   public void testSimplePass() throws IOException, PlanningException {
     // generate allocation that simply fit within all constraints
     int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
+
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
             res, minAlloc)));
@@ -115,9 +122,12 @@ public class TestCapacityOverTimePolicy {
     // fit within
     // max instantanesou
     int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
             res, minAlloc)));
@@ -127,10 +137,13 @@ public class TestCapacityOverTimePolicy {
   public void testMultiTenantPass() throws IOException, PlanningException {
     // generate allocation from multiple tenants that barely fit in tot capacity
     int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     for (int i = 0; i < 4; i++) {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
               res, minAlloc)));
@@ -141,10 +154,13 @@ public class TestCapacityOverTimePolicy {
   public void testMultiTenantFail() throws IOException, PlanningException {
     // generate allocation from multiple tenants that exceed tot capacity
     int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     for (int i = 0; i < 5; i++) {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
               res, minAlloc)));
@@ -155,9 +171,12 @@ public class TestCapacityOverTimePolicy {
   public void testInstFail() throws IOException, PlanningException {
     // generate allocation that exceed the instantaneous cap single-show
     int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
             res, minAlloc)));
@@ -168,23 +187,25 @@ public class TestCapacityOverTimePolicy {
   public void testInstFailBySum() throws IOException, PlanningException {
     // generate allocation that exceed the instantaneous cap by sum
     int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
-
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
             res, minAlloc)));
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
             res, minAlloc)));
     try {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+              ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
               res, minAlloc)));
@@ -205,10 +226,12 @@ public class TestCapacityOverTimePolicy {
         ReservationSystemUtil.toResource(
             ReservationRequest.newInstance(Resource.newInstance(1024, 1),
                 cont)));
-
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + win, win);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + win, req, res, minAlloc)));
   }
 
@@ -222,9 +245,12 @@ public class TestCapacityOverTimePolicy {
     req.put(new ReservationInterval(initTime, initTime + win),
         ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
             .newInstance(1024, 1), cont)));
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + win, win);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + win, req, res, minAlloc)));
 
     try {

+ 12 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
@@ -59,6 +60,7 @@ public class TestInMemoryPlan {
   private SharingPolicy policy;
   private ReservationAgent agent;
   private Planner replanner;
+  private RMContext context;
 
   @Before
   public void setUp() throws PlanningException {
@@ -73,6 +75,8 @@ public class TestInMemoryPlan {
     replanner = mock(Planner.class);
 
     when(clock.getTime()).thenReturn(1L);
+
+    context = ReservationSystemTestUtil.createMockRMContext();
   }
 
   @After
@@ -92,7 +96,7 @@ public class TestInMemoryPlan {
   public void testAddReservation() {
     Plan plan =
         new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
@@ -126,7 +130,7 @@ public class TestInMemoryPlan {
   public void testAddEmptyReservation() {
     Plan plan =
         new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     int[] alloc = {};
@@ -154,7 +158,7 @@ public class TestInMemoryPlan {
     // First add a reservation
     Plan plan =
         new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
@@ -199,7 +203,7 @@ public class TestInMemoryPlan {
   public void testUpdateReservation() {
     Plan plan =
         new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     // First add a reservation
@@ -262,7 +266,7 @@ public class TestInMemoryPlan {
   public void testUpdateNonExistingReservation() {
     Plan plan =
         new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     // Try to update a reservation without adding
@@ -295,7 +299,7 @@ public class TestInMemoryPlan {
     // First add a reservation
     Plan plan =
         new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
@@ -345,7 +349,7 @@ public class TestInMemoryPlan {
   public void testDeleteNonExistingReservation() {
     Plan plan =
         new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID =
         ReservationSystemTestUtil.getNewReservationId();
     // Try to delete a reservation without adding
@@ -365,7 +369,7 @@ public class TestInMemoryPlan {
   public void testArchiveCompletedReservations() {
     Plan plan =
         new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
-            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+            resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
     ReservationId reservationID1 =
         ReservationSystemTestUtil.getNewReservationId();
     // First add a reservation

+ 26 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java

@@ -22,8 +22,10 @@ import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
@@ -67,11 +69,12 @@ public class TestNoOverCommitPolicy {
         (ReservationSchedulerConfiguration.class);
     NoOverCommitPolicy policy = new NoOverCommitPolicy();
     policy.init(reservationQ, conf);
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
 
     plan =
         new InMemoryPlan(rootQueueMetrics, policy, mAgent,
             clusterResource, step, res, minAlloc, maxAlloc,
-            "dedicated", null, true);
+            "dedicated", null, true, context);
   }
 
   public int[] generateData(int length, int val) {
@@ -86,9 +89,12 @@ public class TestNoOverCommitPolicy {
   public void testSingleUserEasyFitPass() throws IOException, PlanningException {
     // generate allocation that easily fit within resource constraints
     int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
             res, minAlloc)));
@@ -99,9 +105,12 @@ public class TestNoOverCommitPolicy {
       PlanningException {
     // generate allocation from single tenant that barely fit
     int[] f = generateData(3600, totCont);
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
             res, minAlloc)));
@@ -121,14 +130,17 @@ public class TestNoOverCommitPolicy {
   public void testUserMismatch() throws IOException, PlanningException {
     // generate allocation from single tenant that exceed capacity
     int[] f = generateData(3600, (int) (0.5 * totCont));
-
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
-    plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1",
+
+    plan.addReservation(new InMemoryReservationAllocation(rid, rDef, "u1",
         "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
             .generateAllocation(initTime, step, f), res, minAlloc));
 
     // trying to update a reservation with a mismatching user
-    plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2",
+    plan.updateReservation(new InMemoryReservationAllocation(rid, rDef, "u2",
         "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
             .generateAllocation(initTime, step, f), res, minAlloc));
   }
@@ -137,10 +149,13 @@ public class TestNoOverCommitPolicy {
   public void testMultiTenantPass() throws IOException, PlanningException {
     // generate allocation from multiple tenants that barely fit in tot capacity
     int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     for (int i = 0; i < 4; i++) {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
               res, minAlloc)));
@@ -151,10 +166,13 @@ public class TestNoOverCommitPolicy {
   public void testMultiTenantFail() throws IOException, PlanningException {
     // generate allocation from multiple tenants that exceed tot capacity
     int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
     for (int i = 0; i < 5; i++) {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
-              ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+              ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
               res, minAlloc)));

+ 11 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java

@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -39,6 +41,7 @@ import org.junit.Assert;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public abstract class TestSchedulerPlanFollowerBase {
@@ -51,6 +54,7 @@ public abstract class TestSchedulerPlanFollowerBase {
   protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
   protected Plan plan;
   private ResourceCalculator res = new DefaultResourceCalculator();
+  private RMContext context = ReservationSystemTestUtil.createMockRMContext();
 
   protected void testPlanFollower(boolean isMove) throws PlanningException,
       InterruptedException, AccessControlException {
@@ -59,27 +63,30 @@ public abstract class TestSchedulerPlanFollowerBase {
         new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
             scheduler.getClusterResource(), 1L, res,
             scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
-            null, isMove);
+            null, isMove, context);
 
     // add a few reservations to the plan
     long ts = System.currentTimeMillis();
     ReservationId r1 = ReservationId.newInstance(ts, 1);
     int[] f1 = { 10, 10, 10, 10, 10 };
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            0, 0 + f1.length + 1, f1.length);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+        plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
             "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
                 .generateAllocation(0L, 1L, f1), res, minAlloc)));
 
     ReservationId r2 = ReservationId.newInstance(ts, 2);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
+        plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u3",
             "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
                 .generateAllocation(3L, 1L, f1), res, minAlloc)));
 
     ReservationId r3 = ReservationId.newInstance(ts, 3);
     int[] f2 = { 0, 10, 20, 10, 0 };
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
+        plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u4",
             "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
                 .generateAllocation(10L, 1L, f2), res, minAlloc)));
 

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
 import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
@@ -724,6 +725,7 @@ public class TestAlignedPlanner {
     policy.init(reservationQ, conf);
 
     QueueMetrics queueMetrics = mock(QueueMetrics.class);
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
 
     // Set planning agent
     agent = new AlignedPlannerWithGreedy();
@@ -731,7 +733,7 @@ public class TestAlignedPlanner {
     // Create Plan
     plan =
         new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
-            res, minAlloc, maxAlloc, "dedicated", null, true);
+            res, minAlloc, maxAlloc, "dedicated", null, true, context);
   }
 
   private int initializeScenario1() throws PlanningException {
@@ -782,9 +784,12 @@ public class TestAlignedPlanner {
   private void addFixedAllocation(long start, long step, int[] f)
       throws PlanningException {
 
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + f.length * step, f.length * step);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null,
+            ReservationSystemTestUtil.getNewReservationId(), rDef,
             "user_fixed", "dedicated", start, start + f.length * step,
             ReservationSystemTestUtil.generateAllocation(start, step, f), res,
             minAlloc)));

+ 19 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
@@ -89,9 +90,10 @@ public class TestGreedyReservationAgent {
     agent = new GreedyReservationAgent();
 
     QueueMetrics queueMetrics = mock(QueueMetrics.class);
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
 
     plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
-        res, minAlloc, maxAlloc, "dedicated", null, true);
+        res, minAlloc, maxAlloc, "dedicated", null, true, context);
   }
 
   @SuppressWarnings("javadoc")
@@ -141,9 +143,12 @@ public class TestGreedyReservationAgent {
     // create a completely utilized segment around time 30
     int[] f = { 100, 100 };
 
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            30 * step, 30 * step + f.length * step, f.length * step);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 30 * step, 30 * step + f.length * step,
             ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
             res, minAlloc)));
@@ -195,10 +200,12 @@ public class TestGreedyReservationAgent {
     prepareBasicPlan();
     // create a completely utilized segment at time 30
     int[] f = { 100, 100 };
-
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            30, 30 * step + f.length * step, f.length * step);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 30 * step, 30 * step + f.length * step,
             ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
             res, minAlloc)));
@@ -515,10 +522,12 @@ public class TestGreedyReservationAgent {
     // conditions for assignment that are non-empty
 
     int[] f = { 10, 10, 20, 20, 20, 10, 10 };
-
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            0, 0 + f.length * step, f.length * step);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
                 .generateAllocation(0, step, f), res, minAlloc)));
 
@@ -527,7 +536,7 @@ public class TestGreedyReservationAgent {
         ReservationSystemTestUtil.generateAllocation(5000, step, f2);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
 
     System.out.println("--------BEFORE AGENT----------");
@@ -562,9 +571,11 @@ public class TestGreedyReservationAgent {
             instConstraint, avgConstraint);
     CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
     policy.init(reservationQ, conf);
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
 
     plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
-      clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
+      clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null,
+        true, context);
 
     int acc = 0;
     List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();

+ 15 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java

@@ -27,15 +27,18 @@ import static org.mockito.Mockito.when;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@@ -67,6 +70,7 @@ public class TestSimpleCapacityReplanner {
     when(clock.getTime()).thenReturn(0L);
     SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
 
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
     ReservationSchedulerConfiguration conf =
         mock(ReservationSchedulerConfiguration.class);
     when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
@@ -76,52 +80,55 @@ public class TestSimpleCapacityReplanner {
     // Initialize the plan with more resources
     InMemoryPlan plan =
         new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
-            res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
+            res, minAlloc, maxAlloc, "dedicated", enf, true, context, clock);
 
     // add reservation filling the plan (separating them 1ms, so we are sure
     // s2 follows s1 on acceptance
     long ts = System.currentTimeMillis();
     ReservationId r1 = ReservationId.newInstance(ts, 1);
     int[] f5 = { 20, 20, 20, 20, 20 };
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            0, 0 + f5.length, f5.length);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+        plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
             minAlloc)));
     when(clock.getTime()).thenReturn(1L);
     ReservationId r2 = ReservationId.newInstance(ts, 2);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
+        plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u4",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
             minAlloc)));
     when(clock.getTime()).thenReturn(2L);
     ReservationId r3 = ReservationId.newInstance(ts, 3);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
+        plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u5",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
             minAlloc)));
     when(clock.getTime()).thenReturn(3L);
     ReservationId r4 = ReservationId.newInstance(ts, 4);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
+        plan.addReservation(new InMemoryReservationAllocation(r4, rDef, "u6",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
             minAlloc)));
     when(clock.getTime()).thenReturn(4L);
     ReservationId r5 = ReservationId.newInstance(ts, 5);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
+        plan.addReservation(new InMemoryReservationAllocation(r5, rDef, "u7",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
             minAlloc)));
 
     int[] f6 = { 50, 50, 50, 50, 50 };
     ReservationId r6 = ReservationId.newInstance(ts, 6);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
+        plan.addReservation(new InMemoryReservationAllocation(r6, rDef, "u3",
             "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
             minAlloc)));
     when(clock.getTime()).thenReturn(6L);
     ReservationId r7 = ReservationId.newInstance(ts, 7);
     assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
+        plan.addReservation(new InMemoryReservationAllocation(r7, rDef, "u4",
             "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
             minAlloc)));