Browse Source

YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit 8baeaead8532898163f1006276b731a237b1a559)

Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
(cherry picked from commit 6261f7cc69a0eb3eebc9898c7599c7c20f432b4e)

subru 10 years ago
parent
commit
cbfbdf60d6
21 changed files with 2165 additions and 2 deletions
  1. 3 0
      YARN-1051-CHANGES.txt
  2. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 102 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
  4. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
  5. 115 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
  6. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
  7. 201 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  8. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
  9. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  10. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  11. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  12. 323 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
  13. 146 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java
  14. 244 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
  15. 125 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
  16. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  17. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  18. 14 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
  19. 116 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
  20. 102 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
  21. 560 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java

+ 3 - 0
YARN-1051-CHANGES.txt

@@ -20,3 +20,6 @@ on user reservations. (Carlo Curino and Subru Krishnan via curino)
 
 
 YARN-1712. Plan follower that synchronizes the current state of reservation
 YARN-1712. Plan follower that synchronizes the current state of reservation
 subsystem with the scheduler. (Subru Krishnan and Carlo Curino  via subru)
 subsystem with the scheduler. (Subru Krishnan and Carlo Curino  via subru)
+
+YARN-2080. Integrating reservation system with ResourceManager and 
+client-RM protocol. (Subru Krishnan and Carlo Curino  via subru)

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -176,6 +176,25 @@ public class YarnConfiguration extends Configuration {
   public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = 
   public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = 
       false;
       false;
 
 
+  /** Whether the RM should enable Reservation System */
+  public static final String RM_RESERVATION_SYSTEM_ENABLE = RM_PREFIX
+      + "reservation-system.enable";
+  public static final boolean DEFAULT_RM_RESERVATION_SYSTEM_ENABLE = false;
+
+  /** The class to use as the Reservation System. */
+  public static final String RM_RESERVATION_SYSTEM_CLASS = RM_PREFIX
+      + "reservation-system.class";
+
+  /** The PlanFollower for the Reservation System. */
+  public static final String RM_RESERVATION_SYSTEM_PLAN_FOLLOWER = RM_PREFIX
+      + "reservation-system.plan.follower";
+
+  /** The step size of the Reservation System. */
+  public static final String RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
+      RM_PREFIX + "reservation-system.planfollower.time-step";
+  public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
+      1000L;
+
   /**
   /**
    * Enable periodic monitor threads.
    * Enable periodic monitor threads.
    * @see #RM_SCHEDULER_MONITOR_POLICIES
    * @see #RM_SCHEDULER_MONITOR_POLICIES

+ 102 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java

@@ -27,10 +27,17 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -43,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -474,4 +482,98 @@ public abstract class YarnClient extends AbstractService {
    */
    */
   public abstract void moveApplicationAcrossQueues(ApplicationId appId,
   public abstract void moveApplicationAcrossQueues(ApplicationId appId,
       String queue) throws YarnException, IOException;
       String queue) throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The interface used by clients to submit a new reservation to the
+   * {@link ResourceManager}.
+   * </p>
+   * 
+   * <p>
+   * The client packages all details of its request in a
+   * {@link ReservationRequest} object. This contains information about the
+   * amount of capacity, temporal constraints, and gang needs. Furthermore, the
+   * reservation might be composed of multiple stages, with ordering
+   * dependencies among them.
+   * </p>
+   * 
+   * <p>
+   * In order to respond, a new admission control component in the
+   * {@link ResourceManager} performs an analysis of the resources that have
+   * been committed over the period of time the user is requesting, verify that
+   * the user requests can be fulfilled, and that it respect a sharing policy
+   * (e.g., {@link CapacityOverTimePolicy}). Once it has positively determined
+   * that the ReservationRequest is satisfiable the {@link ResourceManager}
+   * answers with a {@link ReservationResponse} that include a
+   * {@link ReservationId}. Upon failure to find a valid allocation the response
+   * is an exception with the message detailing the reason of failure.
+   * </p>
+   * 
+   * <p>
+   * The semantics guarantees that the ReservationId returned, corresponds to a
+   * valid reservation existing in the time-range request by the user. The
+   * amount of capacity dedicated to such reservation can vary overtime,
+   * depending of the allocation that has been determined. But it is guaranteed
+   * to satisfy all the constraint expressed by the user in the
+   * {@link ReservationRequest}
+   * </p>
+   * 
+   * @param request request to submit a new Reservation
+   * @return response contains the {@link ReservationId} on accepting the
+   *         submission
+   * @throws YarnException if the reservation cannot be created successfully
+   * @throws IOException
+   * 
+   */
+  @Public
+  @Unstable
+  public abstract ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The interface used by clients to update an existing Reservation. This is
+   * referred to as a re-negotiation process, in which a user that has
+   * previously submitted a Reservation.
+   * </p>
+   * 
+   * <p>
+   * The allocation is attempted by virtually substituting all previous
+   * allocations related to this Reservation with new ones, that satisfy the new
+   * {@link ReservationRequest}. Upon success the previous allocation is
+   * atomically substituted by the new one, and on failure (i.e., if the system
+   * cannot find a valid allocation for the updated request), the previous
+   * allocation remains valid.
+   * </p>
+   * 
+   * @param request to update an existing Reservation (the ReservationRequest
+   *          should refer to an existing valid {@link ReservationId})
+   * @return response empty on successfully updating the existing reservation
+   * @throws YarnException if the request is invalid or reservation cannot be
+   *           updated successfully
+   * @throws IOException
+   * 
+   */
+  @Public
+  @Unstable
+  public abstract ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The interface used by clients to remove an existing Reservation.
+   * </p>
+   * 
+   * @param request to remove an existing Reservation (the ReservationRequest
+   *          should refer to an existing valid {@link ReservationId})
+   * @return response empty on successfully deleting the existing reservation
+   * @throws YarnException if the request is invalid or reservation cannot be
+   *           deleted successfully
+   * @throws IOException
+   * 
+   */
+  @Public
+  @Unstable
+  public abstract ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException;
 }
 }

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

@@ -63,6 +63,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -646,4 +652,23 @@ public class YarnClientImpl extends YarnClient {
         MoveApplicationAcrossQueuesRequest.newInstance(appId, queue);
         MoveApplicationAcrossQueuesRequest.newInstance(appId, queue);
     rmClient.moveApplicationAcrossQueues(request);
     rmClient.moveApplicationAcrossQueues(request);
   }
   }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    return rmClient.submitReservation(request);
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    return rmClient.updateReservation(request);
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    return rmClient.deleteReservation(request);
+  }
+
 }
 }

+ 115 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java

@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
@@ -63,6 +64,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -76,6 +83,11 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -89,8 +101,14 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.LogManager;
@@ -834,4 +852,101 @@ public class TestYarnClient {
       client.stop();
       client.stop();
     }
     }
   }
   }
+  
+  @Test
+  public void testReservationAPIs() {
+    // initialize
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    ReservationSystemTestUtil.setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+    MiniYARNCluster cluster =
+        new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
+    YarnClient client = null;
+    try {
+      cluster.init(conf);
+      cluster.start();
+      final Configuration yarnConf = cluster.getConfig();
+      client = YarnClient.createYarnClient();
+      client.init(yarnConf);
+      client.start();
+
+      // create a reservation
+      Clock clock = new UTCClock();
+      long arrival = clock.getTime();
+      long duration = 60000;
+      long deadline = (long) (arrival + 1.05 * duration);
+      ReservationSubmissionRequest sRequest =
+          createSimpleReservationRequest(4, arrival, deadline, duration);
+      ReservationSubmissionResponse sResponse = null;
+      try {
+        sResponse = client.submitReservation(sRequest);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+      Assert.assertNotNull(sResponse);
+      ReservationId reservationID = sResponse.getReservationId();
+      Assert.assertNotNull(reservationID);
+      System.out.println("Submit reservation response: " + reservationID);
+
+      // Update the reservation
+      ReservationDefinition rDef = sRequest.getReservationDefinition();
+      ReservationRequest rr =
+          rDef.getReservationRequests().getReservationResources().get(0);
+      rr.setNumContainers(5);
+      arrival = clock.getTime();
+      duration = 30000;
+      deadline = (long) (arrival + 1.05 * duration);
+      rr.setDuration(duration);
+      rDef.setArrival(arrival);
+      rDef.setDeadline(deadline);
+      ReservationUpdateRequest uRequest =
+          ReservationUpdateRequest.newInstance(rDef, reservationID);
+      ReservationUpdateResponse uResponse = null;
+      try {
+        uResponse = client.updateReservation(uRequest);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+      Assert.assertNotNull(sResponse);
+      System.out.println("Update reservation response: " + uResponse);
+
+      // Delete the reservation
+      ReservationDeleteRequest dRequest =
+          ReservationDeleteRequest.newInstance(reservationID);
+      ReservationDeleteResponse dResponse = null;
+      try {
+        dResponse = client.deleteReservation(dRequest);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+      Assert.assertNotNull(sResponse);
+      System.out.println("Delete reservation response: " + dResponse);
+    } finally {
+      // clean-up
+      if (client != null) {
+        client.stop();
+      }
+      cluster.stop();
+    }
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationRequest(
+      int numContainers, long arrival, long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+            numContainers, 1, duration);
+    ReservationRequests reqs =
+        ReservationRequests.newInstance(Collections.singletonList(r),
+            ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef =
+        ReservationDefinition.newInstance(arrival, deadline, reqs,
+            "testYarnClient#reservation");
+    ReservationSubmissionRequest request =
+        ReservationSubmissionRequest.newInstance(rDef,
+            ReservationSystemTestUtil.reservationQ);
+    return request;
+  }
 }
 }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@@ -348,6 +349,11 @@ public class AdminService extends CompositeService implements
         recordFactory.newRecordInstance(RefreshQueuesResponse.class);
         recordFactory.newRecordInstance(RefreshQueuesResponse.class);
     try {
     try {
       rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
       rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
+      // refresh the reservation system
+      ReservationSystem rSystem = rmContext.getReservationSystem();
+      if (rSystem != null) {
+        rSystem.reinitialize(getConfig(), rmContext);
+      }
       RMAuditLogger.logSuccess(user.getShortUserName(), argName,
       RMAuditLogger.logSuccess(user.getShortUserName(), argName,
           "AdminService");
           "AdminService");
       return response;
       return response;

+ 201 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.security.AccessControlException;
 import java.security.AccessControlException;
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
@@ -79,6 +80,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -93,6 +100,8 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -107,6 +116,10 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -123,7 +136,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenS
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.UTCClock;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Futures;
@@ -153,10 +168,23 @@ public class ClientRMService extends AbstractService implements
   private final ApplicationACLsManager applicationsACLsManager;
   private final ApplicationACLsManager applicationsACLsManager;
   private final QueueACLsManager queueACLsManager;
   private final QueueACLsManager queueACLsManager;
 
 
+  // For Reservation APIs
+  private Clock clock;
+  private ReservationSystem reservationSystem;
+  private ReservationInputValidator rValidator;
+
   public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
   public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
       RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
       RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
       QueueACLsManager queueACLsManager,
       QueueACLsManager queueACLsManager,
       RMDelegationTokenSecretManager rmDTSecretManager) {
       RMDelegationTokenSecretManager rmDTSecretManager) {
+    this(rmContext, scheduler, rmAppManager, applicationACLsManager,
+        queueACLsManager, rmDTSecretManager, new UTCClock());
+  }
+
+  public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
+      RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
+      QueueACLsManager queueACLsManager,
+      RMDelegationTokenSecretManager rmDTSecretManager, Clock clock) {
     super(ClientRMService.class.getName());
     super(ClientRMService.class.getName());
     this.scheduler = scheduler;
     this.scheduler = scheduler;
     this.rmContext = rmContext;
     this.rmContext = rmContext;
@@ -164,6 +192,9 @@ public class ClientRMService extends AbstractService implements
     this.applicationsACLsManager = applicationACLsManager;
     this.applicationsACLsManager = applicationACLsManager;
     this.queueACLsManager = queueACLsManager;
     this.queueACLsManager = queueACLsManager;
     this.rmDTSecretManager = rmDTSecretManager;
     this.rmDTSecretManager = rmDTSecretManager;
+    this.reservationSystem = rmContext.getReservationSystem();
+    this.clock = clock;
+    this.rValidator = new ReservationInputValidator(clock);
   }
   }
 
 
   @Override
   @Override
@@ -1032,4 +1063,174 @@ public class ClientRMService extends AbstractService implements
   public Server getServer() {
   public Server getServer() {
     return this.server;
     return this.server;
   }
   }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    // Check if reservation system is enabled
+    checkReservationSytem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
+    ReservationSubmissionResponse response =
+        recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
+    // Create a new Reservation Id
+    ReservationId reservationId = reservationSystem.getNewReservationId();
+    // Validate the input
+    Plan plan =
+        rValidator.validateReservationSubmissionRequest(reservationSystem,
+            request, reservationId);
+    // Check ACLs
+    String queueName = request.getQueue();
+    String user =
+        checkReservationACLs(queueName,
+            AuditConstants.SUBMIT_RESERVATION_REQUEST);
+    try {
+      // Try to place the reservation using the agent
+      boolean result =
+          plan.getReservationAgent().createReservation(reservationId, user,
+              plan, request.getReservationDefinition());
+      if (result) {
+        // add the reservation id to valid ones maintained by reservation
+        // system
+        reservationSystem.setQueueForReservation(reservationId, queueName);
+        // create the reservation synchronously if required
+        refreshScheduler(queueName, request.getReservationDefinition(),
+            reservationId.toString());
+        // return the reservation id
+        response.setReservationId(reservationId);
+      }
+    } catch (PlanningException e) {
+      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
+          e.getMessage(), "ClientRMService",
+          "Unable to create the reservation: " + reservationId);
+      throw RPCUtil.getRemoteException(e);
+    }
+    RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
+        "ClientRMService: " + reservationId);
+    return response;
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    // Check if reservation system is enabled
+    checkReservationSytem(AuditConstants.UPDATE_RESERVATION_REQUEST);
+    ReservationUpdateResponse response =
+        recordFactory.newRecordInstance(ReservationUpdateResponse.class);
+    // Validate the input
+    Plan plan =
+        rValidator.validateReservationUpdateRequest(reservationSystem, request);
+    ReservationId reservationId = request.getReservationId();
+    String queueName = reservationSystem.getQueueForReservation(reservationId);
+    // Check ACLs
+    String user =
+        checkReservationACLs(queueName,
+            AuditConstants.UPDATE_RESERVATION_REQUEST);
+    // Try to update the reservation using default agent
+    try {
+      boolean result =
+          plan.getReservationAgent().updateReservation(reservationId, user,
+              plan, request.getReservationDefinition());
+      if (!result) {
+        String errMsg = "Unable to update reservation: " + reservationId;
+        RMAuditLogger.logFailure(user,
+            AuditConstants.UPDATE_RESERVATION_REQUEST, errMsg,
+            "ClientRMService", errMsg);
+        throw RPCUtil.getRemoteException(errMsg);
+      }
+    } catch (PlanningException e) {
+      RMAuditLogger.logFailure(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
+          e.getMessage(), "ClientRMService",
+          "Unable to update the reservation: " + reservationId);
+      throw RPCUtil.getRemoteException(e);
+    }
+    RMAuditLogger.logSuccess(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
+        "ClientRMService: " + reservationId);
+    return response;
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    // Check if reservation system is enabled
+    checkReservationSytem(AuditConstants.DELETE_RESERVATION_REQUEST);
+    ReservationDeleteResponse response =
+        recordFactory.newRecordInstance(ReservationDeleteResponse.class);
+    // Validate the input
+    Plan plan =
+        rValidator.validateReservationDeleteRequest(reservationSystem, request);
+    ReservationId reservationId = request.getReservationId();
+    String queueName = reservationSystem.getQueueForReservation(reservationId);
+    // Check ACLs
+    String user =
+        checkReservationACLs(queueName,
+            AuditConstants.DELETE_RESERVATION_REQUEST);
+    // Try to update the reservation using default agent
+    try {
+      boolean result =
+          plan.getReservationAgent().deleteReservation(reservationId, user,
+              plan);
+      if (!result) {
+        String errMsg = "Could not delete reservation: " + reservationId;
+        RMAuditLogger.logFailure(user,
+            AuditConstants.DELETE_RESERVATION_REQUEST, errMsg,
+            "ClientRMService", errMsg);
+        throw RPCUtil.getRemoteException(errMsg);
+      }
+    } catch (PlanningException e) {
+      RMAuditLogger.logFailure(user, AuditConstants.DELETE_RESERVATION_REQUEST,
+          e.getMessage(), "ClientRMService",
+          "Unable to delete the reservation: " + reservationId);
+      throw RPCUtil.getRemoteException(e);
+    }
+    RMAuditLogger.logSuccess(user, AuditConstants.DELETE_RESERVATION_REQUEST,
+        "ClientRMService: " + reservationId);
+    return response;
+  }
+
+  private void checkReservationSytem(String auditConstant) throws YarnException {
+    // Check if reservation is enabled
+    if (reservationSystem == null) {
+      throw RPCUtil.getRemoteException("Reservation is not enabled."
+          + " Please enable & try again");
+    }
+  }
+
+  private void refreshScheduler(String planName,
+      ReservationDefinition contract, String reservationId) {
+    if ((contract.getArrival() - clock.getTime()) < reservationSystem
+        .getPlanFollowerTimeStep()) {
+      LOG.debug(MessageFormat
+          .format(
+              "Reservation {0} is within threshold so attempting to create synchronously.",
+              reservationId));
+      reservationSystem.synchronizePlan(planName);
+      LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
+          reservationId));
+    }
+  }
+
+  private String checkReservationACLs(String queueName, String auditConstant)
+      throws YarnException {
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = UserGroupInformation.getCurrentUser();
+    } catch (IOException ie) {
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant, queueName,
+          "ClientRMService", "Error getting UGI");
+      throw RPCUtil.getRemoteException(ie);
+    }
+    // Check if user has access on the managed queue
+    if (!queueACLsManager.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS,
+        queueName)) {
+      RMAuditLogger.logFailure(
+          callerUGI.getShortUserName(),
+          auditConstant,
+          "User doesn't have permissions to "
+              + QueueACL.SUBMIT_APPLICATIONS.toString(), "ClientRMService",
+          AuditConstants.UNAUTHORIZED_USER);
+      throw RPCUtil.getRemoteException(new AccessControlException("User "
+          + callerUGI.getShortUserName() + " cannot perform operation "
+          + QueueACL.SUBMIT_APPLICATIONS.name() + " on queue" + queueName));
+    }
+    return callerUGI.getShortUserName();
+  }
 }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java

@@ -56,6 +56,11 @@ public class RMAuditLogger {
 
 
     // Some commonly used descriptions
     // Some commonly used descriptions
     public static final String UNAUTHORIZED_USER = "Unauthorized user";
     public static final String UNAUTHORIZED_USER = "Unauthorized user";
+    
+    // For Reservation system
+    public static final String SUBMIT_RESERVATION_REQUEST = "Submit Reservation Request";
+    public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request";
+    public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request";
   }
   }
 
 
   /**
   /**

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -109,5 +109,7 @@ public interface RMContext {
   
   
   long getEpoch();
   long getEpoch();
 
 
+  ReservationSystem getReservationSystem();
+
   boolean isSchedulerReadyForAllocatingContainers();
   boolean isSchedulerReadyForAllocatingContainers();
 }
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -83,6 +84,7 @@ public class RMContextImpl implements RMContext {
   private ClientRMService clientRMService;
   private ClientRMService clientRMService;
   private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
   private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
   private ResourceScheduler scheduler;
   private ResourceScheduler scheduler;
+  private ReservationSystem reservationSystem;
   private NodesListManager nodesListManager;
   private NodesListManager nodesListManager;
   private ResourceTrackerService resourceTrackerService;
   private ResourceTrackerService resourceTrackerService;
   private ApplicationMasterService applicationMasterService;
   private ApplicationMasterService applicationMasterService;
@@ -208,6 +210,11 @@ public class RMContextImpl implements RMContext {
     return this.scheduler;
     return this.scheduler;
   }
   }
 
 
+  @Override
+  public ReservationSystem getReservationSystem() {
+    return this.reservationSystem;
+  }
+  
   @Override
   @Override
   public NodesListManager getNodesListManager() {
   public NodesListManager getNodesListManager() {
     return this.nodesListManager;
     return this.nodesListManager;
@@ -303,6 +310,10 @@ public class RMContextImpl implements RMContext {
   void setScheduler(ResourceScheduler scheduler) {
   void setScheduler(ResourceScheduler scheduler) {
     this.scheduler = scheduler;
     this.scheduler = scheduler;
   }
   }
+  
+  void setReservationSystem(ReservationSystem reservationSystem) {
+    this.reservationSystem = reservationSystem;
+  }
 
 
   void setDelegationTokenRenewer(
   void setDelegationTokenRenewer(
       DelegationTokenRenewer delegationTokenRenewer) {
       DelegationTokenRenewer delegationTokenRenewer) {

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.AbstractReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -147,6 +149,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   protected RMSecretManagerService rmSecretManagerService;
   protected RMSecretManagerService rmSecretManagerService;
 
 
   protected ResourceScheduler scheduler;
   protected ResourceScheduler scheduler;
+  protected ReservationSystem reservationSystem;
   private ClientRMService clientRM;
   private ClientRMService clientRM;
   protected ApplicationMasterService masterService;
   protected ApplicationMasterService masterService;
   protected NMLivelinessMonitor nmLivelinessMonitor;
   protected NMLivelinessMonitor nmLivelinessMonitor;
@@ -281,6 +284,29 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
     }
   }
   }
 
 
+  protected ReservationSystem createReservationSystem() {
+    String reservationClassName =
+        conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_CLASS,
+            AbstractReservationSystem.getDefaultReservationSystem(scheduler));
+    if (reservationClassName == null) {
+      return null;
+    }
+    LOG.info("Using ReservationSystem: " + reservationClassName);
+    try {
+      Class<?> reservationClazz = Class.forName(reservationClassName);
+      if (ReservationSystem.class.isAssignableFrom(reservationClazz)) {
+        return (ReservationSystem) ReflectionUtils.newInstance(
+            reservationClazz, this.conf);
+      } else {
+        throw new YarnRuntimeException("Class: " + reservationClassName
+            + " not instance of " + ReservationSystem.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate ReservationSystem: " + reservationClassName, e);
+    }
+  }
+
   protected ApplicationMasterLauncher createAMLauncher() {
   protected ApplicationMasterLauncher createAMLauncher() {
     return new ApplicationMasterLauncher(this.rmContext);
     return new ApplicationMasterLauncher(this.rmContext);
   }
   }
@@ -456,6 +482,18 @@ public class ResourceManager extends CompositeService implements Recoverable {
       DefaultMetricsSystem.initialize("ResourceManager");
       DefaultMetricsSystem.initialize("ResourceManager");
       JvmMetrics.initSingleton("ResourceManager", null);
       JvmMetrics.initSingleton("ResourceManager", null);
 
 
+      // Initialize the Reservation system
+      if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
+          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
+        reservationSystem = createReservationSystem();
+        if (reservationSystem != null) {
+          reservationSystem.setRMContext(rmContext);
+          addIfService(reservationSystem);
+          rmContext.setReservationSystem(reservationSystem);
+          LOG.info("Initialized Reservation system");
+        }
+      }
+
       // creating monitors that handle preemption
       // creating monitors that handle preemption
       createPolicyMonitors();
       createPolicyMonitors();
 
 

+ 323 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java

@@ -0,0 +1,323 @@
+/**
+ *
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the implementation of {@link ReservationSystem} based on the
+ * {@link ResourceScheduler}
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public abstract class AbstractReservationSystem extends AbstractService
+    implements ReservationSystem {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AbstractReservationSystem.class);
+
+  // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN
+
+  private final ReentrantReadWriteLock readWriteLock =
+      new ReentrantReadWriteLock(true);
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  private boolean initialized = false;
+
+  private final Clock clock = new UTCClock();
+
+  private AtomicLong resCounter = new AtomicLong();
+
+  private Map<String, Plan> plans = new HashMap<String, Plan>();
+
+  private Map<ReservationId, String> resQMap =
+      new HashMap<ReservationId, String>();
+
+  private RMContext rmContext;
+
+  private ResourceScheduler scheduler;
+
+  private ScheduledExecutorService scheduledExecutorService;
+
+  protected Configuration conf;
+
+  protected long planStepSize;
+
+  private PlanFollower planFollower;
+
+  /**
+   * Construct the service.
+   * 
+   * @param name service name
+   */
+  public AbstractReservationSystem(String name) {
+    super(name);
+  }
+
+  @Override
+  public void setRMContext(RMContext rmContext) {
+    writeLock.lock();
+    try {
+      this.rmContext = rmContext;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void reinitialize(Configuration conf, RMContext rmContext)
+      throws YarnException {
+    writeLock.lock();
+    try {
+      if (!initialized) {
+        initialize(conf);
+        initialized = true;
+      } else {
+        initializeNewPlans(conf);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void initialize(Configuration conf) throws YarnException {
+    LOG.info("Initializing Reservation system");
+    this.conf = conf;
+    scheduler = rmContext.getScheduler();
+    // Get the plan step size
+    planStepSize =
+        conf.getTimeDuration(
+            YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+            YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+            TimeUnit.MILLISECONDS);
+    if (planStepSize < 0) {
+      planStepSize =
+          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
+    }
+    // Create a plan corresponding to every reservable queue
+    Set<String> planQueueNames = scheduler.getPlanQueues();
+    for (String planQueueName : planQueueNames) {
+      Plan plan = initializePlan(planQueueName);
+      plans.put(planQueueName, plan);
+    }
+  }
+
+  private void initializeNewPlans(Configuration conf) {
+    LOG.info("Refreshing Reservation system");
+    writeLock.lock();
+    try {
+      // Create a plan corresponding to every new reservable queue
+      Set<String> planQueueNames = scheduler.getPlanQueues();
+      for (String planQueueName : planQueueNames) {
+        if (!plans.containsKey(planQueueName)) {
+          Plan plan = initializePlan(planQueueName);
+          plans.put(planQueueName, plan);
+        } else {
+          LOG.warn("Plan based on reservation queue {0} already exists.",
+              planQueueName);
+        }
+      }
+      // Update the plan follower with the active plans
+      if (planFollower != null) {
+        planFollower.setPlans(plans.values());
+      }
+    } catch (YarnException e) {
+      LOG.warn("Exception while trying to refresh reservable queues", e);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private PlanFollower createPlanFollower() {
+    String planFollowerPolicyClassName =
+        conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
+            getDefaultPlanFollower());
+    if (planFollowerPolicyClassName == null) {
+      return null;
+    }
+    LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
+    try {
+      Class<?> planFollowerPolicyClazz =
+          conf.getClassByName(planFollowerPolicyClassName);
+      if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
+        return (PlanFollower) ReflectionUtils.newInstance(
+            planFollowerPolicyClazz, conf);
+      } else {
+        throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
+            + " not instance of " + PlanFollower.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate PlanFollowerPolicy: "
+              + planFollowerPolicyClassName, e);
+    }
+  }
+
+  private String getDefaultPlanFollower() {
+    // currently only capacity scheduler is supported
+    if (scheduler instanceof CapacityScheduler) {
+      return CapacitySchedulerPlanFollower.class.getName();
+    }
+    return null;
+  }
+
+  @Override
+  public Plan getPlan(String planName) {
+    readLock.lock();
+    try {
+      return plans.get(planName);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * @return the planStepSize
+   */
+  @Override
+  public long getPlanFollowerTimeStep() {
+    readLock.lock();
+    try {
+      return planStepSize;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void synchronizePlan(String planName) {
+    writeLock.lock();
+    try {
+      Plan plan = plans.get(planName);
+      if (plan != null) {
+        planFollower.synchronizePlan(plan);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Configuration configuration = new Configuration(conf);
+    reinitialize(configuration, rmContext);
+    // Create the plan follower with the active plans
+    planFollower = createPlanFollower();
+    if (planFollower != null) {
+      planFollower.init(clock, scheduler, plans.values());
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    if (planFollower != null) {
+      scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+      scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
+          planStepSize, TimeUnit.MILLISECONDS);
+    }
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() {
+    // Stop the plan follower
+    if (scheduledExecutorService != null
+        && !scheduledExecutorService.isShutdown()) {
+      scheduledExecutorService.shutdown();
+    }
+    // Clear the plans
+    plans.clear();
+  }
+
+  @Override
+  public String getQueueForReservation(ReservationId reservationId) {
+    readLock.lock();
+    try {
+      return resQMap.get(reservationId);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void setQueueForReservation(ReservationId reservationId,
+      String queueName) {
+    writeLock.lock();
+    try {
+      resQMap.put(reservationId, queueName);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public ReservationId getNewReservationId() {
+    writeLock.lock();
+    try {
+      ReservationId resId =
+          ReservationId.newInstance(ResourceManager.getClusterTimeStamp(),
+              resCounter.incrementAndGet());
+      LOG.info("Allocated new reservationId: " + resId);
+      return resId;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Map<String, Plan> getAllPlans() {
+    return plans;
+  }
+
+  /**
+   * Get the default reservation system corresponding to the scheduler
+   * 
+   * @param scheduler the scheduler for which the reservation system is required
+   */
+  public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
+    // currently only capacity scheduler is supported
+    if (scheduler instanceof CapacityScheduler) {
+      return CapacityReservationSystem.class.getName();
+    }
+    return null;
+  }
+
+  protected abstract Plan initializePlan(String planQueueName)
+      throws YarnException;
+
+  protected abstract Planner getReplanner(String planQueueName);
+
+  protected abstract ReservationAgent getAgent(String queueName);
+
+  protected abstract SharingPolicy getAdmissionPolicy(String queueName);
+
+}

+ 146 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java

@@ -0,0 +1,146 @@
+/**
+ *
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the implementation of {@link ReservationSystem} based on the
+ * {@link CapacityScheduler}
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public class CapacityReservationSystem extends AbstractReservationSystem {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CapacityReservationSystem.class);
+
+  private CapacityScheduler capScheduler;
+
+  public CapacityReservationSystem() {
+    super(CapacityReservationSystem.class.getName());
+  }
+
+  @Override
+  public void reinitialize(Configuration conf, RMContext rmContext)
+      throws YarnException {
+    // Validate if the scheduler is capacity based
+    ResourceScheduler scheduler = rmContext.getScheduler();
+    if (!(scheduler instanceof CapacityScheduler)) {
+      throw new YarnRuntimeException("Class "
+          + scheduler.getClass().getCanonicalName() + " not instance of "
+          + CapacityScheduler.class.getCanonicalName());
+    }
+    capScheduler = (CapacityScheduler) scheduler;
+    this.conf = conf;
+    super.reinitialize(conf, rmContext);
+  }
+
+  @Override
+  protected Plan initializePlan(String planQueueName) throws YarnException {
+    SharingPolicy adPolicy = getAdmissionPolicy(planQueueName);
+    String planQueuePath = capScheduler.getQueue(planQueueName).getQueuePath();
+    adPolicy.init(planQueuePath, capScheduler.getConfiguration());
+    CSQueue planQueue = capScheduler.getQueue(planQueueName);
+    // Calculate the max plan capacity
+    Resource minAllocation = capScheduler.getMinimumResourceCapability();
+    ResourceCalculator rescCalc = capScheduler.getResourceCalculator();
+    Resource totCap =
+        rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(),
+            planQueue.getAbsoluteCapacity(), minAllocation);
+    Plan plan =
+        new InMemoryPlan(capScheduler.getRootQueueMetrics(), adPolicy,
+            getAgent(planQueuePath), totCap, planStepSize, rescCalc,
+            minAllocation, capScheduler.getMaximumResourceCapability(),
+            planQueueName, getReplanner(planQueuePath), capScheduler
+                .getConfiguration().getMoveOnExpiry(planQueuePath));
+    LOG.info("Intialized plan {0} based on reservable queue {1}",
+        plan.toString(), planQueueName);
+    return plan;
+  }
+
+  @Override
+  protected Planner getReplanner(String planQueueName) {
+    CapacitySchedulerConfiguration capSchedulerConfig =
+        capScheduler.getConfiguration();
+    String plannerClassName = capSchedulerConfig.getReplanner(planQueueName);
+    LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+        + planQueueName);
+    try {
+      Class<?> plannerClazz =
+          capSchedulerConfig.getClassByName(plannerClassName);
+      if (Planner.class.isAssignableFrom(plannerClazz)) {
+        Planner planner =
+            (Planner) ReflectionUtils.newInstance(plannerClazz, conf);
+        planner.init(planQueueName, capSchedulerConfig);
+        return planner;
+      } else {
+        throw new YarnRuntimeException("Class: " + plannerClazz
+            + " not instance of " + Planner.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException("Could not instantiate Planner: "
+          + plannerClassName + " for queue: " + planQueueName, e);
+    }
+  }
+
+  @Override
+  protected ReservationAgent getAgent(String queueName) {
+    CapacitySchedulerConfiguration capSchedulerConfig =
+        capScheduler.getConfiguration();
+    String agentClassName = capSchedulerConfig.getReservationAgent(queueName);
+    LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
+    try {
+      Class<?> agentClazz = capSchedulerConfig.getClassByName(agentClassName);
+      if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
+        return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf);
+      } else {
+        throw new YarnRuntimeException("Class: " + agentClassName
+            + " not instance of " + ReservationAgent.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException("Could not instantiate Agent: "
+          + agentClassName + " for queue: " + queueName, e);
+    }
+  }
+
+  @Override
+  protected SharingPolicy getAdmissionPolicy(String queueName) {
+    CapacitySchedulerConfiguration capSchedulerConfig =
+        capScheduler.getConfiguration();
+    String admissionPolicyClassName =
+        capSchedulerConfig.getReservationAdmissionPolicy(queueName);
+    LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
+        + " for queue: " + queueName);
+    try {
+      Class<?> admissionPolicyClazz =
+          capSchedulerConfig.getClassByName(admissionPolicyClassName);
+      if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
+        return (SharingPolicy) ReflectionUtils.newInstance(
+            admissionPolicyClazz, conf);
+      } else {
+        throw new YarnRuntimeException("Class: " + admissionPolicyClassName
+            + " not instance of " + SharingPolicy.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
+          + admissionPolicyClassName + " for queue: " + queueName, e);
+    }
+  }
+
+}

+ 244 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java

@@ -0,0 +1,244 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+public class ReservationInputValidator {
+
+  private final Clock clock;
+
+  /**
+   * Utility class to validate reservation requests.
+   */
+  public ReservationInputValidator(Clock clock) {
+    this.clock = clock;
+  }
+
+  private Plan validateReservation(ReservationSystem reservationSystem,
+      ReservationId reservationId, String auditConstant) throws YarnException {
+    String message = "";
+    // check if the reservation id is valid
+    if (reservationId == null) {
+      message =
+          "Missing reservation id."
+              + " Please try again by specifying a reservation id.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    String queueName = reservationSystem.getQueueForReservation(reservationId);
+    if (queueName == null) {
+      message =
+          "The specified reservation with ID: " + reservationId
+              + " is unknown. Please try again with a valid reservation.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    // check if the associated plan is valid
+    Plan plan = reservationSystem.getPlan(queueName);
+    if (plan == null) {
+      message =
+          "The specified reservation: " + reservationId
+              + " is not associated with any valid plan."
+              + " Please try again with a valid reservation.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    return plan;
+  }
+
+  private void validateReservationDefinition(ReservationId reservationId,
+      ReservationDefinition contract, Plan plan, String auditConstant)
+      throws YarnException {
+    String message = "";
+    // check if deadline is in the past
+    if (contract == null) {
+      message =
+          "Missing reservation definition."
+              + " Please try again by specifying a reservation definition.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    if (contract.getDeadline() <= clock.getTime()) {
+      message =
+          "The specified deadline: " + contract.getDeadline()
+              + " is the past. Please try again with deadline in the future.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    // Check if at least one RR has been specified
+    ReservationRequests resReqs = contract.getReservationRequests();
+    if (resReqs == null) {
+      message =
+          "No resources have been specified to reserve."
+              + "Please try again by specifying the resources to reserve.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    List<ReservationRequest> resReq = resReqs.getReservationResources();
+    if (resReq == null || resReq.isEmpty()) {
+      message =
+          "No resources have been specified to reserve."
+              + " Please try again by specifying the resources to reserve.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    // compute minimum duration and max gang size
+    long minDuration = 0;
+    Resource maxGangSize = Resource.newInstance(0, 0);
+    ReservationRequestInterpreter type =
+        contract.getReservationRequests().getInterpreter();
+    for (ReservationRequest rr : resReq) {
+      if (type == ReservationRequestInterpreter.R_ALL
+          || type == ReservationRequestInterpreter.R_ANY) {
+        minDuration = Math.max(minDuration, rr.getDuration());
+      } else {
+        minDuration += rr.getDuration();
+      }
+      maxGangSize =
+          Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
+              maxGangSize,
+              Resources.multiply(rr.getCapability(), rr.getConcurrency()));
+    }
+    // verify the allocation is possible (skip for ANY)
+    if (contract.getDeadline() - contract.getArrival() < minDuration
+        && type != ReservationRequestInterpreter.R_ANY) {
+      message =
+          "The time difference ("
+              + (contract.getDeadline() - contract.getArrival())
+              + ") between arrival (" + contract.getArrival() + ") "
+              + "and deadline (" + contract.getDeadline() + ") must "
+              + " be greater or equal to the minimum resource duration ("
+              + minDuration + ")";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    // check that the largest gang does not exceed the inventory available
+    // capacity (skip for ANY)
+    if (Resources.greaterThan(plan.getResourceCalculator(),
+        plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
+        && type != ReservationRequestInterpreter.R_ANY) {
+      message =
+          "The size of the largest gang in the reservation refinition ("
+              + maxGangSize + ") exceed the capacity available ("
+              + plan.getTotalCapacity() + " )";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast) the input and returns the appropriate {@link Plan} associated with
+   * the specified {@link Queue} or throws an exception message illustrating the
+   * details of any validation check failures
+   * 
+   * @param reservationSystem the {@link ReservationSystem} to validate against
+   * @param request the {@link ReservationSubmissionRequest} defining the
+   *          resources required over time for the request
+   * @param reservationId the {@link ReservationId} associated with the current
+   *          request
+   * @return the {@link Plan} to submit the request to
+   * @throws YarnException
+   */
+  public Plan validateReservationSubmissionRequest(
+      ReservationSystem reservationSystem,
+      ReservationSubmissionRequest request, ReservationId reservationId)
+      throws YarnException {
+    // Check if it is a managed queue
+    String queueName = request.getQueue();
+    if (queueName == null || queueName.isEmpty()) {
+      String errMsg =
+          "The queue to submit is not specified."
+              + " Please try again with a valid reservable queue.";
+      RMAuditLogger.logFailure("UNKNOWN",
+          AuditConstants.SUBMIT_RESERVATION_REQUEST,
+          "validate reservation input", "ClientRMService", errMsg);
+      throw RPCUtil.getRemoteException(errMsg);
+    }
+    Plan plan = reservationSystem.getPlan(queueName);
+    if (plan == null) {
+      String errMsg =
+          "The specified queue: " + queueName
+              + " is not managed by reservation system."
+              + " Please try again with a valid reservable queue.";
+      RMAuditLogger.logFailure("UNKNOWN",
+          AuditConstants.SUBMIT_RESERVATION_REQUEST,
+          "validate reservation input", "ClientRMService", errMsg);
+      throw RPCUtil.getRemoteException(errMsg);
+    }
+    validateReservationDefinition(reservationId,
+        request.getReservationDefinition(), plan,
+        AuditConstants.SUBMIT_RESERVATION_REQUEST);
+    return plan;
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast) the input and returns the appropriate {@link Plan} associated with
+   * the specified {@link Queue} or throws an exception message illustrating the
+   * details of any validation check failures
+   * 
+   * @param reservationSystem the {@link ReservationSystem} to validate against
+   * @param request the {@link ReservationUpdateRequest} defining the resources
+   *          required over time for the request
+   * @return the {@link Plan} to submit the request to
+   * @throws YarnException
+   */
+  public Plan validateReservationUpdateRequest(
+      ReservationSystem reservationSystem, ReservationUpdateRequest request)
+      throws YarnException {
+    ReservationId reservationId = request.getReservationId();
+    Plan plan =
+        validateReservation(reservationSystem, reservationId,
+            AuditConstants.UPDATE_RESERVATION_REQUEST);
+    validateReservationDefinition(reservationId,
+        request.getReservationDefinition(), plan,
+        AuditConstants.UPDATE_RESERVATION_REQUEST);
+    return plan;
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast) the input and returns the appropriate {@link Plan} associated with
+   * the specified {@link Queue} or throws an exception message illustrating the
+   * details of any validation check failures
+   * 
+   * @param reservationSystem the {@link ReservationSystem} to validate against
+   * @param request the {@link ReservationDeleteRequest} defining the resources
+   *          required over time for the request
+   * @return the {@link Plan} to submit the request to
+   * @throws YarnException
+   */
+  public Plan validateReservationDeleteRequest(
+      ReservationSystem reservationSystem, ReservationDeleteRequest request)
+      throws YarnException {
+    return validateReservation(reservationSystem, request.getReservationId(),
+        AuditConstants.DELETE_RESERVATION_REQUEST);
+  }
+
+}

+ 125 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+
+/**
+ * This interface is the one implemented by any system that wants to support
+ * Reservations i.e. make {@link Resource} allocations in future. Implementors
+ * need to bootstrap all configured {@link Plan}s in the active
+ * {@link ResourceScheduler} along with their corresponding
+ * {@link ReservationAgent} and {@link SharingPolicy}. It is also responsible
+ * for managing the {@link PlanFollower} to ensure the {@link Plan}s are in sync
+ * with the {@link ResourceScheduler}.
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public interface ReservationSystem {
+
+  /**
+   * Set RMContext for {@link ReservationSystem}. This method should be called
+   * immediately after instantiating a reservation system once.
+   * 
+   * @param rmContext created by {@link ResourceManager}
+   */
+  void setRMContext(RMContext rmContext);
+
+  /**
+   * Re-initialize the {@link ReservationSystem}.
+   * 
+   * @param conf configuration
+   * @param rmContext current context of the {@link ResourceManager}
+   * @throws YarnException
+   */
+  void reinitialize(Configuration conf, RMContext rmContext)
+      throws YarnException;
+
+  /**
+   * Get an existing {@link Plan} that has been initialized.
+   * 
+   * @param planName the name of the {@link Plan}
+   * @return the {@link Plan} identified by name
+   * 
+   */
+  Plan getPlan(String planName);
+
+  /**
+   * Return a map containing all the plans known to this ReservationSystem
+   * (useful for UI)
+   * 
+   * @return a Map of Plan names and Plan objects
+   */
+  Map<String, Plan> getAllPlans();
+
+  /**
+   * Invokes {@link PlanFollower} to synchronize the specified {@link Plan} with
+   * the {@link ResourceScheduler}
+   * 
+   * @param planName the name of the {@link Plan} to be synchronized
+   */
+  void synchronizePlan(String planName);
+
+  /**
+   * Return the time step (ms) at which the {@link PlanFollower} is invoked
+   * 
+   * @return the time step (ms) at which the {@link PlanFollower} is invoked
+   */
+  long getPlanFollowerTimeStep();
+
+  /**
+   * Get a new unique {@link ReservationId}.
+   * 
+   * @return a new unique {@link ReservationId}
+   * 
+   */
+  ReservationId getNewReservationId();
+
+  /**
+   * Get the {@link Queue} that an existing {@link ReservationId} is associated
+   * with.
+   * 
+   * @param reservationId the unique id of the reservation
+   * @return the name of the associated Queue
+   * 
+   */
+  String getQueueForReservation(ReservationId reservationId);
+
+  /**
+   * Set the {@link Queue} that an existing {@link ReservationId} should be
+   * associated with.
+   * 
+   * @param reservationId the unique id of the reservation
+   * @param queueName the name of Queue to associate the reservation with
+   * 
+   */
+  void setQueueForReservation(ReservationId reservationId, String queueName);
+
+}

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -236,4 +237,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return metrics
    * @return metrics
    */
    */
   RMAppMetrics getRMAppMetrics();
   RMAppMetrics getRMAppMetrics();
+
+  ReservationId getReservationId();
 }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -1284,4 +1285,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.systemClock = clock;
     this.systemClock = clock;
   }
   }
 
 
+  @Override
+  public ReservationId getReservationId() {
+    return submissionContext.getReservationID();
+  }
 }
 }

+ 14 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java

@@ -19,25 +19,33 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 
 
 public class AppAddedSchedulerEvent extends SchedulerEvent {
 public class AppAddedSchedulerEvent extends SchedulerEvent {
 
 
   private final ApplicationId applicationId;
   private final ApplicationId applicationId;
   private final String queue;
   private final String queue;
   private final String user;
   private final String user;
+  private final ReservationId reservationID;
   private final boolean isAppRecovering;
   private final boolean isAppRecovering;
 
 
   public AppAddedSchedulerEvent(
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
       ApplicationId applicationId, String queue, String user) {
-    this(applicationId, queue, user, false);
+    this(applicationId, queue, user, false, null);
   }
   }
 
 
   public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
   public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
-      String user, boolean isAppRecovering) {
+      String user, ReservationId reservationID) {
+    this(applicationId, queue, user, false, reservationID);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering, ReservationId reservationID) {
     super(SchedulerEventType.APP_ADDED);
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.applicationId = applicationId;
     this.queue = queue;
     this.queue = queue;
     this.user = user;
     this.user = user;
+    this.reservationID = reservationID;
     this.isAppRecovering = isAppRecovering;
     this.isAppRecovering = isAppRecovering;
   }
   }
 
 
@@ -56,4 +64,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
   public boolean getIsAppRecovering() {
   public boolean getIsAppRecovering() {
     return isAppRecovering;
     return isAppRecovering;
   }
   }
+
+  public ReservationId getReservationID() {
+    return reservationID;
+  }
 }
 }

+ 116 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -33,6 +33,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
@@ -74,6 +75,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -91,7 +98,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -107,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -117,11 +130,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
@@ -1199,4 +1217,102 @@ public class TestClientRMService {
     when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
     when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
     return yarnScheduler;
     return yarnScheduler;
   }
   }
+
+  @Test
+  public void testReservationAPIs() {
+    // initialize
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    ReservationSystemTestUtil.setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm;
+    try {
+      nm = rm.registerNode("127.0.0.1:0", 102400, 100);
+      // allow plan follower to synchronize
+      Thread.sleep(1050);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Create a client.
+    ClientRMService clientService = rm.getClientRMService();
+
+    // create a reservation
+    Clock clock = new UTCClock();
+    long arrival = clock.getTime();
+    long duration = 60000;
+    long deadline = (long) (arrival + 1.05 * duration);
+    ReservationSubmissionRequest sRequest =
+        createSimpleReservationRequest(4, arrival, deadline, duration);
+    ReservationSubmissionResponse sResponse = null;
+    try {
+      sResponse = clientService.submitReservation(sRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    ReservationId reservationID = sResponse.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+
+    // Update the reservation
+    ReservationDefinition rDef = sRequest.getReservationDefinition();
+    ReservationRequest rr =
+        rDef.getReservationRequests().getReservationResources().get(0);
+    rr.setNumContainers(5);
+    arrival = clock.getTime();
+    duration = 30000;
+    deadline = (long) (arrival + 1.05 * duration);
+    rr.setDuration(duration);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    ReservationUpdateRequest uRequest =
+        ReservationUpdateRequest.newInstance(rDef, reservationID);
+    ReservationUpdateResponse uResponse = null;
+    try {
+      uResponse = clientService.updateReservation(uRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    LOG.info("Update reservation response: " + uResponse);
+
+    // Delete the reservation
+    ReservationDeleteRequest dRequest =
+        ReservationDeleteRequest.newInstance(reservationID);
+    ReservationDeleteResponse dResponse = null;
+    try {
+      dResponse = clientService.deleteReservation(dRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    LOG.info("Delete reservation response: " + dResponse);
+
+    // clean-up
+    rm.stop();
+    nm = null;
+    rm = null;
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationRequest(
+      int numContainers, long arrival, long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+            numContainers, 1, duration);
+    ReservationRequests reqs =
+        ReservationRequests.newInstance(Collections.singletonList(r),
+            ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef =
+        ReservationDefinition.newInstance(arrival, deadline, reqs,
+            "testClientRMService#reservation");
+    ReservationSubmissionRequest request =
+        ReservationSubmissionRequest.newInstance(rDef,
+            ReservationSystemTestUtil.reservationQ);
+    return request;
+  }
 }
 }

+ 102 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java

@@ -0,0 +1,102 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCapacityReservationSystem {
+
+  @Test
+  public void testInitialize() {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler capScheduler = null;
+    try {
+      capScheduler = testUtil.mockCapacityScheduler(10);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    CapacityReservationSystem reservationSystem =
+        new CapacityReservationSystem();
+    reservationSystem.setRMContext(capScheduler.getRMContext());
+    try {
+      reservationSystem.reinitialize(capScheduler.getConf(),
+          capScheduler.getRMContext());
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    String planQName = testUtil.getreservationQueueName();
+    Plan plan = reservationSystem.getPlan(planQName);
+    Assert.assertNotNull(plan);
+    Assert.assertTrue(plan instanceof InMemoryPlan);
+    Assert.assertEquals(planQName, plan.getQueueName());
+    Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+  }
+
+  @Test
+  public void testReinitialize() {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler capScheduler = null;
+    try {
+      capScheduler = testUtil.mockCapacityScheduler(10);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    CapacityReservationSystem reservationSystem =
+        new CapacityReservationSystem();
+    CapacitySchedulerConfiguration conf = capScheduler.getConfiguration();
+    RMContext mockContext = capScheduler.getRMContext();
+    reservationSystem.setRMContext(mockContext);
+    try {
+      reservationSystem.reinitialize(capScheduler.getConfiguration(),
+          mockContext);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    // Assert queue in original config
+    String planQName = testUtil.getreservationQueueName();
+    Plan plan = reservationSystem.getPlan(planQName);
+    Assert.assertNotNull(plan);
+    Assert.assertTrue(plan instanceof InMemoryPlan);
+    Assert.assertEquals(planQName, plan.getQueueName());
+    Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+    // Dynamically add a plan
+    String newQ = "reservation";
+    Assert.assertNull(reservationSystem.getPlan(newQ));
+    testUtil.updateQueueConfiguration(conf, newQ);
+    try {
+      capScheduler.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    try {
+      reservationSystem.reinitialize(conf, mockContext);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Plan newPlan = reservationSystem.getPlan(newQ);
+    Assert.assertNotNull(newPlan);
+    Assert.assertTrue(newPlan instanceof InMemoryPlan);
+    Assert.assertEquals(newQ, newPlan.getQueueName());
+    Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+  }
+
+}

+ 560 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java

@@ -0,0 +1,560 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReservationInputValidator {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestReservationInputValidator.class);
+
+  private static final String PLAN_NAME = "test-reservation";
+
+  private Clock clock;
+  private Map<String, Plan> plans = new HashMap<String, Plan>(1);
+  private ReservationSystem rSystem;
+  private Plan plan;
+
+  private ReservationInputValidator rrValidator;
+
+  @Before
+  public void setUp() {
+    clock = mock(Clock.class);
+    plan = mock(Plan.class);
+    rSystem = mock(ReservationSystem.class);
+    plans.put(PLAN_NAME, plan);
+    rrValidator = new ReservationInputValidator(clock);
+    when(clock.getTime()).thenReturn(1L);
+    ResourceCalculator rCalc = new DefaultResourceCalculator();
+    Resource resource = Resource.newInstance(10240, 10);
+    when(plan.getResourceCalculator()).thenReturn(rCalc);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    when(rSystem.getQueueForReservation(any(ReservationId.class))).thenReturn(
+        PLAN_NAME);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(plan);
+  }
+
+  @After
+  public void tearDown() {
+    rrValidator = null;
+    clock = null;
+    plan = null;
+  }
+
+  @Test
+  public void testSubmitReservationNormal() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testSubmitReservationDoesnotExist() {
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .equals("The queue to submit is not specified. Please try again with a valid reservable queue."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidPlan() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not managed by reservation system. Please try again with a valid reservable queue."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationNoDefinition() {
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    request.setQueue(PLAN_NAME);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .equals("Missing reservation definition. Please try again by specifying a reservation definition."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidDeadline() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 0, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("The specified deadline: 0 is the past"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidRR() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(0, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRR() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidDuration() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 3, 4);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message.startsWith("The time difference"));
+      Assert
+          .assertTrue(message
+              .contains("must  be greater or equal to the minimum resource duration"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationExceedsGangSize() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 4);
+    Resource resource = Resource.newInstance(512, 1);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("The size of the largest gang in the reservation refinition"));
+      Assert.assertTrue(message.contains("exceed the capacity available "));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationNormal() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testUpdateReservationNoID() {
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation id. Please try again by specifying a reservation id."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationDoesnotExist() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    ReservationId rId = request.getReservationId();
+    when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message.equals(MessageFormat
+              .format(
+                  "The specified reservation with ID: {0} is unknown. Please try again with a valid reservation.",
+                  rId)));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidPlan() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not associated with any valid plan. Please try again with a valid reservation."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationNoDefinition() {
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation definition. Please try again by specifying a reservation definition."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidDeadline() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 0, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("The specified deadline: 0 is the past"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidRR() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(0, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationEmptyRR() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidDuration() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 3, 4);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .contains("must  be greater or equal to the minimum resource duration"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationExceedsGangSize() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    Resource resource = Resource.newInstance(512, 1);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("The size of the largest gang in the reservation refinition"));
+      Assert.assertTrue(message.contains("exceed the capacity available "));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationNormal() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(reservationID);
+    ReservationAllocation reservation = mock(ReservationAllocation.class);
+    when(plan.getReservationById(reservationID)).thenReturn(reservation);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testDeleteReservationNoID() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation id. Please try again by specifying a reservation id."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationDoesnotExist() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId rId = ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(rId);
+    when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message.equals(MessageFormat
+              .format(
+                  "The specified reservation with ID: {0} is unknown. Please try again with a valid reservation.",
+                  rId)));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationInvalidPlan() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(reservationID);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not associated with any valid plan. Please try again with a valid reservation."));
+      LOG.info(message);
+    }
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
+      int numRequests, int numContainers, long arrival, long deadline,
+      long duration) {
+    // create a request with a single atomic ask
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    if (numRequests > 0) {
+      ReservationRequests reqs = new ReservationRequestsPBImpl();
+      rDef.setReservationRequests(reqs);
+      if (numContainers > 0) {
+        ReservationRequest r =
+            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                numContainers, 1, duration);
+
+        reqs.setReservationResources(Collections.singletonList(r));
+        reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+      }
+    }
+    request.setQueue(PLAN_NAME);
+    request.setReservationDefinition(rDef);
+    return request;
+  }
+
+  private ReservationUpdateRequest createSimpleReservationUpdateRequest(
+      int numRequests, int numContainers, long arrival, long deadline,
+      long duration) {
+    // create a request with a single atomic ask
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    if (numRequests > 0) {
+      ReservationRequests reqs = new ReservationRequestsPBImpl();
+      rDef.setReservationRequests(reqs);
+      if (numContainers > 0) {
+        ReservationRequest r =
+            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                numContainers, 1, duration);
+
+        reqs.setReservationResources(Collections.singletonList(r));
+        reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+      }
+    }
+    request.setReservationDefinition(rDef);
+    request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+    return request;
+  }
+
+}