ソースを参照

YARN-1709. In-memory data structures used to track resources over time to enable reservations.
(cherry picked from commit 0d8b2cd88b958b1e602fd4ea4078ef8d4742a7c3)
(cherry picked from commit cf4b34282aafee9f6b09d3433c4de1ae4b359168)

subru 10 年 前
コミット
63250ef9d6
15 ファイル変更2534 行追加0 行削除
  1. 3 0
      YARN-1051-CHANGES.txt
  2. 507 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
  3. 151 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
  4. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
  5. 101 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
  6. 61 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
  7. 89 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
  8. 332 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
  9. 104 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
  10. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java
  11. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
  12. 210 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
  13. 477 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
  14. 206 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
  15. 169 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java

+ 3 - 0
YARN-1051-CHANGES.txt

@@ -5,3 +5,6 @@ YARN-2475. Logic for responding to capacity drops for the
 ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
 
 YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru)
+
+YARN-1709. In-memory data structures used to track resources over time to
+enable reservations. (subru)

+ 507 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java

@@ -0,0 +1,507 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryPlan implements Plan {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
+
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+  private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
+      new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
+
+  private RLESparseResourceAllocation rleSparseVector;
+
+  private Map<String, RLESparseResourceAllocation> userResourceAlloc =
+      new HashMap<String, RLESparseResourceAllocation>();
+
+  private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
+      new HashMap<ReservationId, InMemoryReservationAllocation>();
+
+  private final ReentrantReadWriteLock readWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+  private final SharingPolicy policy;
+  private final ReservationAgent agent;
+  private final long step;
+  private final ResourceCalculator resCalc;
+  private final Resource minAlloc, maxAlloc;
+  private final String queueName;
+  private final QueueMetrics queueMetrics;
+  private final Planner replanner;
+  private final boolean getMoveOnExpiry;
+  private final Clock clock;
+
+  private Resource totalCapacity;
+
+  InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+      ReservationAgent agent, Resource totalCapacity, long step,
+      ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
+      String queueName, Planner replanner, boolean getMoveOnExpiry) {
+    this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
+        maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
+  }
+
+  InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+      ReservationAgent agent, Resource totalCapacity, long step,
+      ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
+      String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
+    this.queueMetrics = queueMetrics;
+    this.policy = policy;
+    this.agent = agent;
+    this.step = step;
+    this.totalCapacity = totalCapacity;
+    this.resCalc = resCalc;
+    this.minAlloc = minAlloc;
+    this.maxAlloc = maxAlloc;
+    this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc);
+    this.queueName = queueName;
+    this.replanner = replanner;
+    this.getMoveOnExpiry = getMoveOnExpiry;
+    this.clock = clock;
+  }
+
+  @Override
+  public QueueMetrics getQueueMetrics() {
+    return queueMetrics;
+  }
+
+  private void incrementAllocation(ReservationAllocation reservation) {
+    assert (readWriteLock.isWriteLockedByCurrentThread());
+    Map<ReservationInterval, ReservationRequest> allocationRequests =
+        reservation.getAllocationRequests();
+    // check if we have encountered the user earlier and if not add an entry
+    String user = reservation.getUser();
+    RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+    if (resAlloc == null) {
+      resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
+      userResourceAlloc.put(user, resAlloc);
+    }
+    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+        .entrySet()) {
+      resAlloc.addInterval(r.getKey(), r.getValue());
+      rleSparseVector.addInterval(r.getKey(), r.getValue());
+    }
+  }
+
+  private void decrementAllocation(ReservationAllocation reservation) {
+    assert (readWriteLock.isWriteLockedByCurrentThread());
+    Map<ReservationInterval, ReservationRequest> allocationRequests =
+        reservation.getAllocationRequests();
+    String user = reservation.getUser();
+    RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+        .entrySet()) {
+      resAlloc.removeInterval(r.getKey(), r.getValue());
+      rleSparseVector.removeInterval(r.getKey(), r.getValue());
+    }
+    if (resAlloc.isEmpty()) {
+      userResourceAlloc.remove(resAlloc);
+    }
+  }
+
+  public Set<ReservationAllocation> getAllReservations() {
+    readLock.lock();
+    try {
+      if (currentReservations != null) {
+        Set<ReservationAllocation> flattenedReservations =
+            new HashSet<ReservationAllocation>();
+        for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
+            .values()) {
+          flattenedReservations.addAll(reservationEntries);
+        }
+        return flattenedReservations;
+      } else {
+        return null;
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean addReservation(ReservationAllocation reservation)
+      throws PlanningException {
+    // Verify the allocation is memory based otherwise it is not supported
+    InMemoryReservationAllocation inMemReservation =
+        (InMemoryReservationAllocation) reservation;
+    if (inMemReservation.getUser() == null) {
+      String errMsg =
+          "The specified Reservation with ID "
+              + inMemReservation.getReservationId()
+              + " is not mapped to any user";
+      LOG.error(errMsg);
+      throw new IllegalArgumentException(errMsg);
+    }
+    writeLock.lock();
+    try {
+      if (reservationTable.containsKey(inMemReservation.getReservationId())) {
+        String errMsg =
+            "The specified Reservation with ID "
+                + inMemReservation.getReservationId() + " already exists";
+        LOG.error(errMsg);
+        throw new IllegalArgumentException(errMsg);
+      }
+      // Validate if we can accept this reservation, throws exception if
+      // validation fails
+      policy.validate(this, inMemReservation);
+      // we record here the time in which the allocation has been accepted
+      reservation.setAcceptanceTimestamp(clock.getTime());
+      ReservationInterval searchInterval =
+          new ReservationInterval(inMemReservation.getStartTime(),
+              inMemReservation.getEndTime());
+      Set<InMemoryReservationAllocation> reservations =
+          currentReservations.get(searchInterval);
+      if (reservations == null) {
+        reservations = new HashSet<InMemoryReservationAllocation>();
+      }
+      if (!reservations.add(inMemReservation)) {
+        LOG.error("Unable to add reservation: {} to plan.",
+            inMemReservation.getReservationId());
+        return false;
+      }
+      currentReservations.put(searchInterval, reservations);
+      reservationTable.put(inMemReservation.getReservationId(),
+          inMemReservation);
+      incrementAllocation(inMemReservation);
+      LOG.info("Sucessfully added reservation: {} to plan.",
+          inMemReservation.getReservationId());
+      return true;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean updateReservation(ReservationAllocation reservation)
+      throws PlanningException {
+    writeLock.lock();
+    boolean result = false;
+    try {
+      ReservationId resId = reservation.getReservationId();
+      ReservationAllocation currReservation = getReservationById(resId);
+      if (currReservation == null) {
+        String errMsg =
+            "The specified Reservation with ID " + resId
+                + " does not exist in the plan";
+        LOG.error(errMsg);
+        throw new IllegalArgumentException(errMsg);
+      }
+      if (!removeReservation(currReservation)) {
+        LOG.error("Unable to replace reservation: {} from plan.",
+            reservation.getReservationId());
+        return result;
+      }
+      try {
+        result = addReservation(reservation);
+      } catch (PlanningException e) {
+        LOG.error("Unable to update reservation: {} from plan due to {}.",
+            reservation.getReservationId(), e.getMessage());
+      }
+      if (result) {
+        LOG.info("Sucessfully updated reservation: {} in plan.",
+            reservation.getReservationId());
+        return result;
+      } else {
+        // rollback delete
+        addReservation(currReservation);
+        LOG.info("Rollbacked update reservation: {} from plan.",
+            reservation.getReservationId());
+        return result;
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private boolean removeReservation(ReservationAllocation reservation) {
+    assert (readWriteLock.isWriteLockedByCurrentThread());
+    ReservationInterval searchInterval =
+        new ReservationInterval(reservation.getStartTime(),
+            reservation.getEndTime());
+    Set<InMemoryReservationAllocation> reservations =
+        currentReservations.get(searchInterval);
+    if (reservations != null) {
+      if (!reservations.remove(reservation)) {
+        LOG.error("Unable to remove reservation: {} from plan.",
+            reservation.getReservationId());
+        return false;
+      }
+      if (reservations.isEmpty()) {
+        currentReservations.remove(searchInterval);
+      }
+    } else {
+      String errMsg =
+          "The specified Reservation with ID " + reservation.getReservationId()
+              + " does not exist in the plan";
+      LOG.error(errMsg);
+      throw new IllegalArgumentException(errMsg);
+    }
+    reservationTable.remove(reservation.getReservationId());
+    decrementAllocation(reservation);
+    LOG.info("Sucessfully deleted reservation: {} in plan.",
+        reservation.getReservationId());
+    return true;
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationID) {
+    writeLock.lock();
+    try {
+      ReservationAllocation reservation = getReservationById(reservationID);
+      if (reservation == null) {
+        String errMsg =
+            "The specified Reservation with ID " + reservationID
+                + " does not exist in the plan";
+        LOG.error(errMsg);
+        throw new IllegalArgumentException(errMsg);
+      }
+      return removeReservation(reservation);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void archiveCompletedReservations(long tick) {
+    // Since we are looking for old reservations, read lock is optimal
+    LOG.debug("Running archival at time: {}", tick);
+    readLock.lock();
+    List<InMemoryReservationAllocation> expiredReservations =
+        new ArrayList<InMemoryReservationAllocation>();
+    // archive reservations and delete the ones which are beyond
+    // the reservation policy "window"
+    try {
+      long archivalTime = tick - policy.getValidWindow();
+      ReservationInterval searchInterval =
+          new ReservationInterval(archivalTime, archivalTime);
+      SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
+          currentReservations.headMap(searchInterval, true);
+      if (!reservations.isEmpty()) {
+        for (Set<InMemoryReservationAllocation> reservationEntries : reservations
+            .values()) {
+          for (InMemoryReservationAllocation reservation : reservationEntries) {
+            if (reservation.getEndTime() <= archivalTime) {
+              expiredReservations.add(reservation);
+            }
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    if (expiredReservations.isEmpty()) {
+      return;
+    }
+    // Need write lock only if there are any reservations to be deleted
+    writeLock.lock();
+    try {
+      for (InMemoryReservationAllocation expiredReservation : expiredReservations) {
+        removeReservation(expiredReservation);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Set<ReservationAllocation> getReservationsAtTime(long tick) {
+    readLock.lock();
+    ReservationInterval searchInterval =
+        new ReservationInterval(tick, Long.MAX_VALUE);
+    try {
+      SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
+          currentReservations.headMap(searchInterval, true);
+      if (!reservations.isEmpty()) {
+        Set<ReservationAllocation> flattenedReservations =
+            new HashSet<ReservationAllocation>();
+        for (Set<InMemoryReservationAllocation> reservationEntries : reservations
+            .values()) {
+          for (InMemoryReservationAllocation reservation : reservationEntries) {
+            if (reservation.getEndTime() > tick) {
+              flattenedReservations.add(reservation);
+            }
+          }
+        }
+        return Collections.unmodifiableSet(flattenedReservations);
+      } else {
+        return Collections.emptySet();
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public long getStep() {
+    return step;
+  }
+
+  @Override
+  public SharingPolicy getSharingPolicy() {
+    return policy;
+  }
+
+  @Override
+  public ReservationAgent getReservationAgent() {
+    return agent;
+  }
+
+  @Override
+  public Resource getConsumptionForUser(String user, long t) {
+    readLock.lock();
+    try {
+      RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
+      if (userResAlloc != null) {
+        return userResAlloc.getCapacityAtTime(t);
+      } else {
+        return Resources.clone(ZERO_RESOURCE);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Resource getTotalCommittedResources(long t) {
+    readLock.lock();
+    try {
+      return rleSparseVector.getCapacityAtTime(t);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public ReservationAllocation getReservationById(ReservationId reservationID) {
+    if (reservationID == null) {
+      return null;
+    }
+    readLock.lock();
+    try {
+      return reservationTable.get(reservationID);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Resource getTotalCapacity() {
+    readLock.lock();
+    try {
+      return Resources.clone(totalCapacity);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Resource getMinimumAllocation() {
+    return Resources.clone(minAlloc);
+  }
+
+  @Override
+  public void setTotalCapacity(Resource cap) {
+    writeLock.lock();
+    try {
+      totalCapacity = Resources.clone(cap);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public long getEarliestStartTime() {
+    readLock.lock();
+    try {
+      return rleSparseVector.getEarliestStartTime();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public long getLastEndTime() {
+    readLock.lock();
+    try {
+      return rleSparseVector.getLatestEndTime();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return resCalc;
+  }
+
+  @Override
+  public String getQueueName() {
+    return queueName;
+  }
+
+  @Override
+  public Resource getMaximumAllocation() {
+    return Resources.clone(maxAlloc);
+  }
+
+  public String toCumulativeString() {
+    readLock.lock();
+    try {
+      return rleSparseVector.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Planner getReplanner() {
+    return replanner;
+  }
+
+  @Override
+  public boolean getMoveOnExpiry() {
+    return getMoveOnExpiry;
+  }
+
+  @Override
+  public String toString() {
+    readLock.lock();
+    try {
+      StringBuffer planStr = new StringBuffer("In-memory Plan: ");
+      planStr.append("Parent Queue: ").append(queueName)
+          .append("Total Capacity: ").append(totalCapacity).append("Step: ")
+          .append(step);
+      for (ReservationAllocation reservation : getAllReservations()) {
+        planStr.append(reservation);
+      }
+      return planStr.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+}

+ 151 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java

@@ -0,0 +1,151 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * An in memory implementation of a reservation allocation using the
+ * {@link RLESparseResourceAllocation}
+ * 
+ */
+class InMemoryReservationAllocation implements ReservationAllocation {
+
+  private final String planName;
+  private final ReservationId reservationID;
+  private final String user;
+  private final ReservationDefinition contract;
+  private final long startTime;
+  private final long endTime;
+  private final Map<ReservationInterval, ReservationRequest> allocationRequests;
+  private boolean hasGang = false;
+  private long acceptedAt = -1;
+
+  private RLESparseResourceAllocation resourcesOverTime;
+
+  InMemoryReservationAllocation(ReservationId reservationID,
+      ReservationDefinition contract, String user, String planName,
+      long startTime, long endTime,
+      Map<ReservationInterval, ReservationRequest> allocationRequests,
+      ResourceCalculator calculator, Resource minAlloc) {
+    this.contract = contract;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.reservationID = reservationID;
+    this.user = user;
+    this.allocationRequests = allocationRequests;
+    this.planName = planName;
+    resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
+    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+        .entrySet()) {
+      resourcesOverTime.addInterval(r.getKey(), r.getValue());
+      if (r.getValue().getConcurrency() > 1) {
+        hasGang = true;
+      }
+    }
+  }
+
+  @Override
+  public ReservationId getReservationId() {
+    return reservationID;
+  }
+
+  @Override
+  public ReservationDefinition getReservationDefinition() {
+    return contract;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public long getEndTime() {
+    return endTime;
+  }
+
+  @Override
+  public Map<ReservationInterval, ReservationRequest> getAllocationRequests() {
+    return Collections.unmodifiableMap(allocationRequests);
+  }
+
+  @Override
+  public String getPlanName() {
+    return planName;
+  }
+
+  @Override
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public boolean containsGangs() {
+    return hasGang;
+  }
+
+  @Override
+  public void setAcceptanceTimestamp(long acceptedAt) {
+    this.acceptedAt = acceptedAt;
+  }
+
+  @Override
+  public long getAcceptanceTime() {
+    return acceptedAt;
+  }
+
+  @Override
+  public Resource getResourcesAtTime(long tick) {
+    if (tick < startTime || tick >= endTime) {
+      return Resource.newInstance(0, 0);
+    }
+    return Resources.clone(resourcesOverTime.getCapacityAtTime(tick));
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sBuf = new StringBuilder();
+    sBuf.append(getReservationId()).append(" user:").append(getUser())
+        .append(" startTime: ").append(getStartTime()).append(" endTime: ")
+        .append(getEndTime()).append(" alloc:[")
+        .append(resourcesOverTime.toString()).append("] ");
+    return sBuf.toString();
+  }
+
+  @Override
+  public int compareTo(ReservationAllocation other) {
+    // reverse order of acceptance
+    if (this.getAcceptanceTime() > other.getAcceptanceTime()) {
+      return -1;
+    }
+    if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
+      return 1;
+    }
+    return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return reservationID.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    InMemoryReservationAllocation other = (InMemoryReservationAllocation) obj;
+    return this.reservationID.equals(other.getReservationId());
+  }
+
+}

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java

@@ -0,0 +1,32 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+/**
+ * A Plan represents the central data structure of a reservation system that
+ * maintains the "agenda" for the cluster. In particular, it maintains
+ * information on how a set of {@link ReservationDefinition} that have been
+ * previously accepted will be honored.
+ * 
+ * {@link ReservationDefinition} submitted by the users through the RM public
+ * APIs are passed to appropriate {@link ReservationAgent}s, which in turn will
+ * consult the Plan (via the {@link PlanView} interface) and try to determine
+ * whether there are sufficient resources available in this Plan to satisfy the
+ * temporal and resource constraints of a {@link ReservationDefinition}. If a
+ * valid allocation is found the agent will try to store it in the plan (via the
+ * {@link PlanEdit} interface). Upon success the system return to the user a
+ * positive acknowledgment, and a reservation identifier to be later used to
+ * access the reserved resources.
+ * 
+ * A {@link PlanFollower} will continuously read from the Plan and will
+ * affect the instantaneous allocation of resources among jobs running by
+ * publishing the "current" slice of the Plan to the underlying scheduler. I.e.,
+ * the configuration of queues/weights of the scheduler are modified to reflect
+ * the allocations in the Plan.
+ * 
+ * As this interface have several methods we decompose them into three groups:
+ * {@link PlanContext}: containing configuration type information,
+ * {@link PlanView} read-only access to the plan state, and {@link PlanEdit}
+ * write access to the plan state.
+ */
+public interface Plan extends PlanContext, PlanView, PlanEdit {
+
+}

+ 101 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java

@@ -0,0 +1,101 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+
+/**
+ * This interface provides read-only access to configuration-type parameter for
+ * a plan.
+ * 
+ */
+public interface PlanContext {
+
+  /**
+   * Returns the configured "step" or granularity of time of the plan in millis.
+   * 
+   * @return plan step in millis
+   */
+  public long getStep();
+
+  /**
+   * Return the {@link ReservationAgent} configured for this plan that is
+   * responsible for optimally placing various reservation requests
+   * 
+   * @return the {@link ReservationAgent} configured for this plan
+   */
+  public ReservationAgent getReservationAgent();
+
+  /**
+   * Return an instance of a {@link Planner}, which will be invoked in response
+   * to unexpected reduction in the resources of this plan
+   * 
+   * @return an instance of a {@link Planner}, which will be invoked in response
+   *         to unexpected reduction in the resources of this plan
+   */
+  public Planner getReplanner();
+
+  /**
+   * Return the configured {@link SharingPolicy} that governs the sharing of the
+   * resources of the plan between its various users
+   * 
+   * @return the configured {@link SharingPolicy} that governs the sharing of
+   *         the resources of the plan between its various users
+   */
+  public SharingPolicy getSharingPolicy();
+
+  /**
+   * Returns the system {@link ResourceCalculator}
+   * 
+   * @return the system {@link ResourceCalculator}
+   */
+  public ResourceCalculator getResourceCalculator();
+
+  /**
+   * Returns the single smallest {@link Resource} allocation that can be
+   * reserved in this plan
+   * 
+   * @return the single smallest {@link Resource} allocation that can be
+   *         reserved in this plan
+   */
+  public Resource getMinimumAllocation();
+
+  /**
+   * Returns the single largest {@link Resource} allocation that can be reserved
+   * in this plan
+   * 
+   * @return the single largest {@link Resource} allocation that can be reserved
+   *         in this plan
+   */
+  public Resource getMaximumAllocation();
+
+  /**
+   * Return the name of the queue in the {@link ResourceScheduler} corresponding
+   * to this plan
+   * 
+   * @return the name of the queue in the {@link ResourceScheduler}
+   *         corresponding to this plan
+   */
+  public String getQueueName();
+
+  /**
+   * Return the {@link QueueMetrics} for the queue in the
+   * {@link ResourceScheduler} corresponding to this plan
+   * 
+   * @return the {@link QueueMetrics} for the queue in the
+   *         {@link ResourceScheduler} corresponding to this plan
+   */
+  public QueueMetrics getQueueMetrics();
+
+  /**
+   * Instructs the {@link PlanFollower} on what to do for applications
+   * which are still running when the reservation is expiring (move-to-default
+   * vs kill)
+   * 
+   * @return true if remaining applications have to be killed, false if they
+   *         have to migrated
+   */
+  public boolean getMoveOnExpiry();
+
+}

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java

@@ -0,0 +1,61 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * This interface groups the methods used to modify the state of a Plan.
+ */
+public interface PlanEdit extends PlanContext, PlanView {
+
+  /**
+   * Add a new {@link ReservationAllocation} to the plan
+   * 
+   * @param reservation the {@link ReservationAllocation} to be added to the
+   *          plan
+   * @return true if addition is successful, false otherwise
+   */
+  public boolean addReservation(ReservationAllocation reservation)
+      throws PlanningException;
+
+  /**
+   * Updates an existing {@link ReservationAllocation} in the plan. This is
+   * required for re-negotiation
+   * 
+   * @param reservation the {@link ReservationAllocation} to be updated the plan
+   * @return true if update is successful, false otherwise
+   */
+  public boolean updateReservation(ReservationAllocation reservation)
+      throws PlanningException;
+
+  /**
+   * Delete an existing {@link ReservationAllocation} from the plan identified
+   * uniquely by its {@link ReservationId}. This will generally be used for
+   * garbage collection
+   * 
+   * @param reservation the {@link ReservationAllocation} to be deleted from the
+   *          plan identified uniquely by its {@link ReservationId}
+   * @return true if delete is successful, false otherwise
+   */
+  public boolean deleteReservation(ReservationId reservationID)
+      throws PlanningException;
+
+  /**
+   * Method invoked to garbage collect old reservations. It cleans up expired
+   * reservations that have fallen out of the sliding archival window
+   * 
+   * @param tick the current time from which the archival window is computed
+   */
+  public void archiveCompletedReservations(long tick) throws PlanningException;
+
+  /**
+   * Sets the overall capacity in terms of {@link Resource} assigned to this
+   * plan
+   * 
+   * @param capacity the overall capacity in terms of {@link Resource} assigned
+   *          to this plan
+   */
+  public void setTotalCapacity(Resource capacity);
+
+}

+ 89 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java

@@ -0,0 +1,89 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * This interface provides a read-only view on the allocations made in this
+ * plan. This methods are used for example by {@link ReservationAgent}s to
+ * determine the free resources in a certain point in time, and by
+ * PlanFollowerPolicy to publish this plan to the scheduler.
+ */
+public interface PlanView extends PlanContext {
+
+  /**
+   * Return a {@link ReservationAllocation} identified by its
+   * {@link ReservationId}
+   * 
+   * @param reservationID the unique id to identify the
+   *          {@link ReservationAllocation}
+   * @return {@link ReservationAllocation} identified by the specified id
+   */
+  public ReservationAllocation getReservationById(ReservationId reservationID);
+
+  /**
+   * Gets all the active reservations at the specified point of time
+   * 
+   * @param tick the time (UTC in ms) for which the active reservations are
+   *          requested
+   * @return set of active reservations at the specified time
+   */
+  public Set<ReservationAllocation> getReservationsAtTime(long tick);
+
+  /**
+   * Gets all the reservations in the plan
+   * 
+   * @return set of all reservations handled by this Plan
+   */
+  public Set<ReservationAllocation> getAllReservations();
+
+  /**
+   * Returns the total {@link Resource} reserved for all users at the specified
+   * time
+   * 
+   * @param tick the time (UTC in ms) for which the reserved resources are
+   *          requested
+   * @return the total {@link Resource} reserved for all users at the specified
+   *         time
+   */
+  public Resource getTotalCommittedResources(long tick);
+
+  /**
+   * Returns the total {@link Resource} reserved for a given user at the
+   * specified time
+   * 
+   * @param user the user who made the reservation(s)
+   * @param tick the time (UTC in ms) for which the reserved resources are
+   *          requested
+   * @return the total {@link Resource} reserved for a given user at the
+   *         specified time
+   */
+  public Resource getConsumptionForUser(String user, long tick);
+
+  /**
+   * Returns the overall capacity in terms of {@link Resource} assigned to this
+   * plan (typically will correspond to the absolute capacity of the
+   * corresponding queue).
+   * 
+   * @return the overall capacity in terms of {@link Resource} assigned to this
+   *         plan
+   */
+  public Resource getTotalCapacity();
+
+  /**
+   * Gets the time (UTC in ms) at which the first reservation starts
+   * 
+   * @return the time (UTC in ms) at which the first reservation starts
+   */
+  public long getEarliestStartTime();
+
+  /**
+   * Returns the time (UTC in ms) at which the last reservation terminates
+   * 
+   * @return the time (UTC in ms) at which the last reservation terminates
+   */
+  public long getLastEndTime();
+
+}

+ 332 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java

@@ -0,0 +1,332 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.gson.stream.JsonWriter;
+
+/**
+ * This is a run length encoded sparse data structure that maintains resource
+ * allocations over time
+ */
+public class RLESparseResourceAllocation {
+
+  private static final int THRESHOLD = 100;
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+  private TreeMap<Long, Resource> cumulativeCapacity =
+      new TreeMap<Long, Resource>();
+
+  private final ReentrantReadWriteLock readWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  private final ResourceCalculator resourceCalculator;
+  private final Resource minAlloc;
+
+  public RLESparseResourceAllocation(ResourceCalculator resourceCalculator,
+      Resource minAlloc) {
+    this.resourceCalculator = resourceCalculator;
+    this.minAlloc = minAlloc;
+  }
+
+  private boolean isSameAsPrevious(Long key, Resource capacity) {
+    Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key);
+    return (previous != null && previous.getValue().equals(capacity));
+  }
+
+  private boolean isSameAsNext(Long key, Resource capacity) {
+    Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key);
+    return (next != null && next.getValue().equals(capacity));
+  }
+
+  /**
+   * Add a resource for the specified interval
+   * 
+   * @param reservationInterval the interval for which the resource is to be
+   *          added
+   * @param capacity the resource to be added
+   * @return true if addition is successful, false otherwise
+   */
+  public boolean addInterval(ReservationInterval reservationInterval,
+      ReservationRequest capacity) {
+    Resource totCap =
+        Resources.multiply(capacity.getCapability(),
+            (float) capacity.getNumContainers());
+    if (totCap.equals(ZERO_RESOURCE)) {
+      return true;
+    }
+    writeLock.lock();
+    try {
+      long startKey = reservationInterval.getStartTime();
+      long endKey = reservationInterval.getEndTime();
+      NavigableMap<Long, Resource> ticks =
+          cumulativeCapacity.headMap(endKey, false);
+      if (ticks != null && !ticks.isEmpty()) {
+        Resource updatedCapacity = Resource.newInstance(0, 0);
+        Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey);
+        if (lowEntry == null) {
+          // This is the earliest starting interval
+          cumulativeCapacity.put(startKey, totCap);
+        } else {
+          updatedCapacity = Resources.add(lowEntry.getValue(), totCap);
+          // Add a new tick only if the updated value is different
+          // from the previous tick
+          if ((startKey == lowEntry.getKey())
+              && (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) {
+            cumulativeCapacity.remove(lowEntry.getKey());
+          } else {
+            cumulativeCapacity.put(startKey, updatedCapacity);
+          }
+        }
+        // Increase all the capacities of overlapping intervals
+        Set<Entry<Long, Resource>> overlapSet =
+            ticks.tailMap(startKey, false).entrySet();
+        for (Entry<Long, Resource> entry : overlapSet) {
+          updatedCapacity = Resources.add(entry.getValue(), totCap);
+          entry.setValue(updatedCapacity);
+        }
+      } else {
+        // This is the first interval to be added
+        cumulativeCapacity.put(startKey, totCap);
+      }
+      Resource nextTick = cumulativeCapacity.get(endKey);
+      if (nextTick != null) {
+        // If there is overlap, remove the duplicate entry
+        if (isSameAsPrevious(endKey, nextTick)) {
+          cumulativeCapacity.remove(endKey);
+        }
+      } else {
+        // Decrease capacity as this is end of the interval
+        cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity
+            .floorEntry(endKey).getValue(), totCap));
+      }
+      return true;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Add multiple resources for the specified interval
+   * 
+   * @param reservationInterval the interval for which the resource is to be
+   *          added
+   * @param ReservationRequests the resources to be added
+   * @param clusterResource the total resources in the cluster
+   * @return true if addition is successful, false otherwise
+   */
+  public boolean addCompositeInterval(ReservationInterval reservationInterval,
+      List<ReservationRequest> ReservationRequests, Resource clusterResource) {
+    ReservationRequest aggregateReservationRequest =
+        Records.newRecord(ReservationRequest.class);
+    Resource capacity = Resource.newInstance(0, 0);
+    for (ReservationRequest ReservationRequest : ReservationRequests) {
+      Resources.addTo(capacity, Resources.multiply(
+          ReservationRequest.getCapability(),
+          ReservationRequest.getNumContainers()));
+    }
+    aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources
+        .divide(resourceCalculator, clusterResource, capacity, minAlloc)));
+    aggregateReservationRequest.setCapability(minAlloc);
+
+    return addInterval(reservationInterval, aggregateReservationRequest);
+  }
+
+  /**
+   * Removes a resource for the specified interval
+   * 
+   * @param reservationInterval the interval for which the resource is to be
+   *          removed
+   * @param capacity the resource to be removed
+   * @return true if removal is successful, false otherwise
+   */
+  public boolean removeInterval(ReservationInterval reservationInterval,
+      ReservationRequest capacity) {
+    Resource totCap =
+        Resources.multiply(capacity.getCapability(),
+            (float) capacity.getNumContainers());
+    if (totCap.equals(ZERO_RESOURCE)) {
+      return true;
+    }
+    writeLock.lock();
+    try {
+      long startKey = reservationInterval.getStartTime();
+      long endKey = reservationInterval.getEndTime();
+      // update the start key
+      NavigableMap<Long, Resource> ticks =
+          cumulativeCapacity.headMap(endKey, false);
+      // Decrease all the capacities of overlapping intervals
+      SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey);
+      if (overlapSet != null && !overlapSet.isEmpty()) {
+        Resource updatedCapacity = Resource.newInstance(0, 0);
+        long currentKey = -1;
+        for (Iterator<Entry<Long, Resource>> overlapEntries =
+            overlapSet.entrySet().iterator(); overlapEntries.hasNext();) {
+          Entry<Long, Resource> entry = overlapEntries.next();
+          currentKey = entry.getKey();
+          updatedCapacity = Resources.subtract(entry.getValue(), totCap);
+          // update each entry between start and end key
+          cumulativeCapacity.put(currentKey, updatedCapacity);
+        }
+        // Remove the first overlap entry if it is same as previous after
+        // updation
+        Long firstKey = overlapSet.firstKey();
+        if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) {
+          cumulativeCapacity.remove(firstKey);
+        }
+        // Remove the next entry if it is same as end entry after updation
+        if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) {
+          cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey));
+        }
+      }
+      return true;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Returns the capacity, i.e. total resources allocated at the specified point
+   * of time
+   * 
+   * @param tick the time (UTC in ms) at which the capacity is requested
+   * @return the resources allocated at the specified time
+   */
+  public Resource getCapacityAtTime(long tick) {
+    readLock.lock();
+    try {
+      Entry<Long, Resource> closestStep = cumulativeCapacity.floorEntry(tick);
+      if (closestStep != null) {
+        return Resources.clone(closestStep.getValue());
+      }
+      return Resources.clone(ZERO_RESOURCE);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get the timestamp of the earliest resource allocation
+   * 
+   * @return the timestamp of the first resource allocation
+   */
+  public long getEarliestStartTime() {
+    readLock.lock();
+    try {
+      if (cumulativeCapacity.isEmpty()) {
+        return -1;
+      } else {
+        return cumulativeCapacity.firstKey();
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get the timestamp of the latest resource allocation
+   * 
+   * @return the timestamp of the last resource allocation
+   */
+  public long getLatestEndTime() {
+    readLock.lock();
+    try {
+      if (cumulativeCapacity.isEmpty()) {
+        return -1;
+      } else {
+        return cumulativeCapacity.lastKey();
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if there are no non-zero entries
+   * 
+   * @return true if there are no allocations or false otherwise
+   */
+  public boolean isEmpty() {
+    readLock.lock();
+    try {
+      if (cumulativeCapacity.isEmpty()) {
+        return true;
+      }
+      // Deletion leaves a single zero entry so check for that
+      if (cumulativeCapacity.size() == 1) {
+        return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE);
+      }
+      return false;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder ret = new StringBuilder();
+    readLock.lock();
+    try {
+      if (cumulativeCapacity.size() > THRESHOLD) {
+        ret.append("Number of steps: ").append(cumulativeCapacity.size())
+            .append(" earliest entry: ").append(cumulativeCapacity.firstKey())
+            .append(" latest entry: ").append(cumulativeCapacity.lastKey());
+      } else {
+        for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
+          ret.append(r.getKey()).append(": ").append(r.getValue())
+              .append("\n ");
+        }
+      }
+      return ret.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Returns the JSON string representation of the current resources allocated
+   * over time
+   * 
+   * @return the JSON string representation of the current resources allocated
+   *         over time
+   */
+  public String toMemJSONString() {
+    StringWriter json = new StringWriter();
+    JsonWriter jsonWriter = new JsonWriter(json);
+    readLock.lock();
+    try {
+      jsonWriter.beginObject();
+      // jsonWriter.name("timestamp").value("resource");
+      for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
+        jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
+      }
+      jsonWriter.endObject();
+      jsonWriter.close();
+      return json.toString();
+    } catch (IOException e) {
+      // This should not happen
+      return "";
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+}

+ 104 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java

@@ -0,0 +1,104 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * A ReservationAllocation represents a concrete allocation of resources over
+ * time that satisfy a certain {@link ReservationDefinition}. This is used
+ * internally by a {@link Plan} to store information about how each of the
+ * accepted {@link ReservationDefinition} have been allocated.
+ */
+public interface ReservationAllocation extends
+    Comparable<ReservationAllocation> {
+
+  /**
+   * Returns the unique identifier {@link ReservationId} that represents the
+   * reservation
+   * 
+   * @return reservationId the unique identifier {@link ReservationId} that
+   *         represents the reservation
+   */
+  public ReservationId getReservationId();
+
+  /**
+   * Returns the original {@link ReservationDefinition} submitted by the client
+   * 
+   * @return
+   */
+  public ReservationDefinition getReservationDefinition();
+
+  /**
+   * Returns the time at which the reservation is activated
+   * 
+   * @return the time at which the reservation is activated
+   */
+  public long getStartTime();
+
+  /**
+   * Returns the time at which the reservation terminates
+   * 
+   * @return the time at which the reservation terminates
+   */
+  public long getEndTime();
+
+  /**
+   * Returns the map of resources requested against the time interval for which
+   * they were
+   * 
+   * @return the allocationRequests the map of resources requested against the
+   *         time interval for which they were
+   */
+  public Map<ReservationInterval, ReservationRequest> getAllocationRequests();
+
+  /**
+   * Return a string identifying the plan to which the reservation belongs
+   * 
+   * @return the plan to which the reservation belongs
+   */
+  public String getPlanName();
+
+  /**
+   * Returns the user who requested the reservation
+   * 
+   * @return the user who requested the reservation
+   */
+  public String getUser();
+
+  /**
+   * Returns whether the reservation has gang semantics or not
+   * 
+   * @return true if there is a gang request, false otherwise
+   */
+  public boolean containsGangs();
+
+  /**
+   * Sets the time at which the reservation was accepted by the system
+   * 
+   * @param acceptedAt the time at which the reservation was accepted by the
+   *          system
+   */
+  public void setAcceptanceTimestamp(long acceptedAt);
+
+  /**
+   * Returns the time at which the reservation was accepted by the system
+   * 
+   * @return the time at which the reservation was accepted by the system
+   */
+  public long getAcceptanceTime();
+
+  /**
+   * Returns the capacity represented by cumulative resources reserved by the
+   * reservation at the specified point of time
+   * 
+   * @param tick the time (UTC in ms) for which the reserved resources are
+   *          requested
+   * @return the resources reserved at the specified time
+   */
+  public Resource getResourcesAtTime(long tick);
+
+}

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java

@@ -0,0 +1,67 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+/**
+ * This represents the time duration of the reservation
+ * 
+ */
+public class ReservationInterval implements Comparable<ReservationInterval> {
+
+  private final long startTime;
+
+  private final long endTime;
+
+  public ReservationInterval(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+  }
+
+  /**
+   * Get the start time of the reservation interval
+   * 
+   * @return the startTime
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Get the end time of the reservation interval
+   * 
+   * @return the endTime
+   */
+  public long getEndTime() {
+    return endTime;
+  }
+
+  /**
+   * Returns whether the interval is active at the specified instant of time
+   * 
+   * @param tick the instance of the time to check
+   * @return true if active, false otherwise
+   */
+  public boolean isOverlap(long tick) {
+    return (startTime <= tick && tick <= endTime);
+  }
+
+  @Override
+  public int compareTo(ReservationInterval anotherInterval) {
+    long diff = 0;
+    if (startTime == anotherInterval.getStartTime()) {
+      diff = endTime - anotherInterval.getEndTime();
+    } else {
+      diff = startTime - anotherInterval.getStartTime();
+    }
+    if (diff < 0) {
+      return -1;
+    } else if (diff > 0) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
+  public String toString() {
+    return "[" + startTime + ", " + endTime + "]";
+  }
+
+}

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java

@@ -0,0 +1,25 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+
+/**
+ * Exception thrown by the admission control subsystem when there is a problem
+ * in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
+ */
+public class PlanningException extends Exception {
+
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public PlanningException(String message) {
+    super(message);
+  }
+
+  public PlanningException(Throwable cause) {
+    super(cause);
+  }
+
+  public PlanningException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

+ 210 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java

@@ -0,0 +1,210 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+public class ReservationSystemTestUtil {
+
+  private static Random rand = new Random();
+
+  public final static String reservationQ = "dedicated";
+
+  public static ReservationId getNewReservationId() {
+    return ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+  }
+
+  public CapacityScheduler mockCapacityScheduler(int numContainers)
+      throws IOException {
+    // stolen from TestCapacityScheduler
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+
+    CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
+    cs.setConf(new YarnConfiguration());
+    RMContext mockRmContext =
+        Mockito.spy(new RMContextImpl(null, null, null, null, null, null,
+            new RMContainerTokenSecretManager(conf),
+            new NMTokenSecretManagerInRM(conf),
+            new ClientToAMTokenSecretManagerInRM(), null));
+    cs.setRMContext(mockRmContext);
+    try {
+      cs.serviceInit(conf);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    when(mockRmContext.getScheduler()).thenReturn(cs);
+    Resource r = Resource.newInstance(numContainers * 1024, numContainers);
+    doReturn(r).when(cs).getClusterResource();
+    return cs;
+  }
+
+  public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+    // Define default queue
+    final String defQ = CapacitySchedulerConfiguration.ROOT + ".default";
+    conf.setCapacity(defQ, 10);
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
+        "default", "a", reservationQ });
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+
+    final String dedicated =
+        CapacitySchedulerConfiguration.ROOT
+            + CapacitySchedulerConfiguration.DOT + reservationQ;
+    conf.setCapacity(dedicated, 80);
+    // Set as reservation queue
+    conf.setReservableQueue(dedicated, true);
+
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    final String A2 = A + ".a2";
+    conf.setQueues(A, new String[] { "a1", "a2" });
+    conf.setCapacity(A1, 30);
+    conf.setCapacity(A2, 70);
+  }
+
+  public String getFullReservationQueueName() {
+    return CapacitySchedulerConfiguration.ROOT
+        + CapacitySchedulerConfiguration.DOT + reservationQ;
+  }
+
+  public String getreservationQueueName() {
+    return reservationQ;
+  }
+
+  public void updateQueueConfiguration(CapacitySchedulerConfiguration conf,
+      String newQ) {
+    // Define default queue
+    final String prefix =
+        CapacitySchedulerConfiguration.ROOT
+            + CapacitySchedulerConfiguration.DOT;
+    final String defQ = prefix + "default";
+    conf.setCapacity(defQ, 5);
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
+        "default", "a", reservationQ, newQ });
+
+    final String A = prefix + "a";
+    conf.setCapacity(A, 5);
+
+    final String dedicated = prefix + reservationQ;
+    conf.setCapacity(dedicated, 80);
+    // Set as reservation queue
+    conf.setReservableQueue(dedicated, true);
+
+    conf.setCapacity(prefix + newQ, 10);
+    // Set as reservation queue
+    conf.setReservableQueue(prefix + newQ, true);
+
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    final String A2 = A + ".a2";
+    conf.setQueues(A, new String[] { "a1", "a2" });
+    conf.setCapacity(A1, 30);
+    conf.setCapacity(A2, 70);
+  }
+
+  public static ReservationDefinition generateRandomRR(Random rand, long i) {
+    rand.setSeed(i);
+    long now = System.currentTimeMillis();
+
+    // start time at random in the next 12 hours
+    long arrival = rand.nextInt(12 * 3600 * 1000);
+    // deadline at random in the next day
+    long deadline = arrival + rand.nextInt(24 * 3600 * 1000);
+
+    // create a request with a single atomic ask
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(now + arrival);
+    rr.setDeadline(now + deadline);
+
+    int gang = 1 + rand.nextInt(9);
+    int par = (rand.nextInt(1000) + 1) * gang;
+    long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
+            gang, dur);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    rand.nextInt(3);
+    ReservationRequestInterpreter[] type =
+        ReservationRequestInterpreter.values();
+    reqs.setInterpreter(type[rand.nextInt(type.length)]);
+    rr.setReservationRequests(reqs);
+
+    return rr;
+
+  }
+
+  public static ReservationDefinition generateBigRR(Random rand, long i) {
+    rand.setSeed(i);
+    long now = System.currentTimeMillis();
+
+    // start time at random in the next 2 hours
+    long arrival = rand.nextInt(2 * 3600 * 1000);
+    // deadline at random in the next day
+    long deadline = rand.nextInt(24 * 3600 * 1000);
+
+    // create a request with a single atomic ask
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(now + arrival);
+    rr.setDeadline(now + deadline);
+
+    int gang = 1;
+    int par = 100000; // 100k tasks
+    long dur = rand.nextInt(60 * 1000); // 1min tasks
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
+            gang, dur);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    rand.nextInt(3);
+    ReservationRequestInterpreter[] type =
+        ReservationRequestInterpreter.values();
+    reqs.setInterpreter(type[rand.nextInt(type.length)]);
+    rr.setReservationRequests(reqs);
+
+    return rr;
+  }
+
+  public static Map<ReservationInterval, ReservationRequest> generateAllocation(
+      long startTime, long step, int[] alloc) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new TreeMap<ReservationInterval, ReservationRequest>();
+    for (int i = 0; i < alloc.length; i++) {
+      req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
+          * step), ReservationRequest.newInstance(
+          Resource.newInstance(1024, 1), alloc[i]));
+    }
+    return req;
+  }
+
+}

+ 477 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java

@@ -0,0 +1,477 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemoryPlan {
+
+  private String user = "yarn";
+  private String planName = "test-reservation";
+  private ResourceCalculator resCalc;
+  private Resource minAlloc;
+  private Resource maxAlloc;
+  private Resource totalCapacity;
+
+  private Clock clock;
+  private QueueMetrics queueMetrics;
+  private SharingPolicy policy;
+  private ReservationAgent agent;
+  private Planner replanner;
+
+  @Before
+  public void setUp() throws PlanningException {
+    resCalc = new DefaultResourceCalculator();
+    minAlloc = Resource.newInstance(1024, 1);
+    maxAlloc = Resource.newInstance(64 * 1024, 20);
+    totalCapacity = Resource.newInstance(100 * 1024, 100);
+
+    clock = mock(Clock.class);
+    queueMetrics = mock(QueueMetrics.class);
+    policy = mock(SharingPolicy.class);
+    replanner = mock(Planner.class);
+
+    when(clock.getTime()).thenReturn(1L);
+  }
+
+  @After
+  public void tearDown() {
+    resCalc = null;
+    minAlloc = null;
+    maxAlloc = null;
+    totalCapacity = null;
+
+    clock = null;
+    queueMetrics = null;
+    policy = null;
+    replanner = null;
+  }
+
+  @Test
+  public void testAddReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+  }
+
+  @Test
+  public void testAddEmptyReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = {};
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testAddReservationAlreadyExists() {
+    // First add a reservation
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Try to add it again
+    try {
+      plan.addReservation(rAllocation);
+      Assert.fail("Add should fail as it already exists");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().endsWith("already exists"));
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+  }
+
+  @Test
+  public void testUpdateReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    // First add a reservation
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Now update it
+    start = 110;
+    int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 };
+    allocations = generateAllocation(start, updatedAlloc, true);
+    rDef =
+        createSimpleReservationDefinition(start, start + updatedAlloc.length,
+            updatedAlloc.length, allocations.values());
+    rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + updatedAlloc.length, allocations, resCalc, minAlloc);
+    try {
+      plan.updateReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < updatedAlloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+              + i), plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+              + i), plan.getConsumptionForUser(user, start + i));
+    }
+  }
+
+  @Test
+  public void testUpdateNonExistingReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    // Try to update a reservation without adding
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.updateReservation(rAllocation);
+      Assert.fail("Update should fail as it does not exist in the plan");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID));
+  }
+
+  @Test
+  public void testDeleteReservation() {
+    // First add a reservation
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, true);
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length,
+            alloc.length, allocations.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length, allocations, resCalc, minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Now delete it
+    try {
+      plan.deleteReservation(reservationID);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID));
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          plan.getConsumptionForUser(user, start + i));
+    }
+  }
+
+  @Test
+  public void testDeleteNonExistingReservation() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    // Try to delete a reservation without adding
+    Assert.assertNull(plan.getReservationById(reservationID));
+    try {
+      plan.deleteReservation(reservationID);
+      Assert.fail("Delete should fail as it does not exist in the plan");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID));
+  }
+
+  @Test
+  public void testArchiveCompletedReservations() {
+    Plan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
+            resCalc, minAlloc, maxAlloc, planName, replanner, true);
+    ReservationId reservationID1 =
+        ReservationSystemTestUtil.getNewReservationId();
+    // First add a reservation
+    int[] alloc1 = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Map<ReservationInterval, ReservationRequest> allocations1 =
+        generateAllocation(start, alloc1, false);
+    ReservationDefinition rDef1 =
+        createSimpleReservationDefinition(start, start + alloc1.length,
+            alloc1.length, allocations1.values());
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID1, rDef1, user,
+            planName, start, start + alloc1.length, allocations1, resCalc,
+            minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID1));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    doAssertions(plan, rAllocation);
+    for (int i = 0; i < alloc1.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Now add another one
+    ReservationId reservationID2 =
+        ReservationSystemTestUtil.getNewReservationId();
+    int[] alloc2 = { 0, 5, 10, 5, 0 };
+    Map<ReservationInterval, ReservationRequest> allocations2 =
+        generateAllocation(start, alloc2, true);
+    ReservationDefinition rDef2 =
+        createSimpleReservationDefinition(start, start + alloc2.length,
+            alloc2.length, allocations2.values());
+    rAllocation =
+        new InMemoryReservationAllocation(reservationID2, rDef2, user,
+            planName, start, start + alloc2.length, allocations2, resCalc,
+            minAlloc);
+    Assert.assertNull(plan.getReservationById(reservationID2));
+    try {
+      plan.addReservation(rAllocation);
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan.getReservationById(reservationID2));
+    for (int i = 0; i < alloc2.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+              + alloc2[i] + i), plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+              + alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
+    }
+
+    // Now archive completed reservations
+    when(clock.getTime()).thenReturn(106L);
+    when(policy.getValidWindow()).thenReturn(1L);
+    try {
+      // will only remove 2nd reservation as only that has fallen out of the
+      // archival window
+      plan.archiveCompletedReservations(clock.getTime());
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan.getReservationById(reservationID1));
+    Assert.assertNull(plan.getReservationById(reservationID2));
+    for (int i = 0; i < alloc1.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
+          plan.getConsumptionForUser(user, start + i));
+    }
+    when(clock.getTime()).thenReturn(107L);
+    try {
+      // will remove 1st reservation also as it has fallen out of the archival
+      // window
+      plan.archiveCompletedReservations(clock.getTime());
+    } catch (PlanningException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNull(plan.getReservationById(reservationID1));
+    for (int i = 0; i < alloc1.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          plan.getTotalCommittedResources(start + i));
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          plan.getConsumptionForUser(user, start + i));
+    }
+  }
+
+  private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
+    ReservationId reservationID = rAllocation.getReservationId();
+    Assert.assertNotNull(plan.getReservationById(reservationID));
+    Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
+    Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
+    Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
+    Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
+    Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
+    Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
+    Assert.assertEquals(resCalc, plan.getResourceCalculator());
+    Assert.assertEquals(planName, plan.getQueueName());
+    Assert.assertTrue(plan.getMoveOnExpiry());
+  }
+
+  private ReservationDefinition createSimpleReservationDefinition(long arrival,
+      long deadline, long duration, Collection<ReservationRequest> resources) {
+    // create a request with a single atomic ask
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(new ArrayList<ReservationRequest>(resources));
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    rDef.setReservationRequests(reqs);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    return rDef;
+  }
+
+  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+      int startTime, int[] alloc, boolean isStep) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    int numContainers = 0;
+    for (int i = 0; i < alloc.length; i++) {
+      if (isStep) {
+        numContainers = alloc[i] + i;
+      } else {
+        numContainers = alloc[i];
+      }
+      ReservationRequest rr =
+          ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+              (numContainers));
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
+    }
+    return req;
+  }
+
+}

+ 206 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java

@@ -0,0 +1,206 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemoryReservationAllocation {
+
+  private String user = "yarn";
+  private String planName = "test-reservation";
+  private ResourceCalculator resCalc;
+  private Resource minAlloc;
+
+  private Random rand = new Random();
+
+  @Before
+  public void setUp() {
+    resCalc = new DefaultResourceCalculator();
+    minAlloc = Resource.newInstance(1, 1);
+  }
+
+  @After
+  public void tearDown() {
+    user = null;
+    planName = null;
+    resCalc = null;
+    minAlloc = null;
+  }
+
+  @Test
+  public void testBlocks() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false, false);
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+    Assert.assertFalse(rAllocation.containsGangs());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          rAllocation.getResourcesAtTime(start + i));
+    }
+  }
+
+  @Test
+  public void testSteps() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, true, false);
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+    Assert.assertFalse(rAllocation.containsGangs());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          rAllocation.getResourcesAtTime(start + i));
+    }
+  }
+
+  @Test
+  public void testSkyline() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = { 0, 5, 10, 10, 5, 0 };
+    int start = 100;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, true, false);
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+    Assert.assertFalse(rAllocation.containsGangs());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          rAllocation.getResourcesAtTime(start + i));
+    }
+  }
+
+  @Test
+  public void testZeroAlloaction() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = {};
+    long start = 0;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, (int) start,
+        alloc);
+    Assert.assertFalse(rAllocation.containsGangs());
+  }
+
+  @Test
+  public void testGangAlloaction() {
+    ReservationId reservationID =
+        ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    ReservationDefinition rDef =
+        createSimpleReservationDefinition(start, start + alloc.length + 1,
+            alloc.length);
+    Map<ReservationInterval, ReservationRequest> allocations =
+        generateAllocation(start, alloc, false, true);
+    ReservationAllocation rAllocation =
+        new InMemoryReservationAllocation(reservationID, rDef, user, planName,
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+    doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
+    Assert.assertTrue(rAllocation.containsGangs());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          rAllocation.getResourcesAtTime(start + i));
+    }
+  }
+
+  private void doAssertions(ReservationAllocation rAllocation,
+      ReservationId reservationID, ReservationDefinition rDef,
+      Map<ReservationInterval, ReservationRequest> allocations, int start,
+      int[] alloc) {
+    Assert.assertEquals(reservationID, rAllocation.getReservationId());
+    Assert.assertEquals(rDef, rAllocation.getReservationDefinition());
+    Assert.assertEquals(allocations, rAllocation.getAllocationRequests());
+    Assert.assertEquals(user, rAllocation.getUser());
+    Assert.assertEquals(planName, rAllocation.getPlanName());
+    Assert.assertEquals(start, rAllocation.getStartTime());
+    Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime());
+  }
+
+  private ReservationDefinition createSimpleReservationDefinition(long arrival,
+      long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
+            duration);
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    rDef.setReservationRequests(reqs);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    return rDef;
+  }
+
+  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+      int startTime, int[] alloc, boolean isStep, boolean isGang) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    int numContainers = 0;
+    for (int i = 0; i < alloc.length; i++) {
+      if (isStep) {
+        numContainers = alloc[i] + i;
+      } else {
+        numContainers = alloc[i];
+      }
+      ReservationRequest rr =
+          ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+              (numContainers));
+      if (isGang) {
+        rr.setConcurrency(numContainers);
+      }
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
+    }
+    return req;
+  }
+
+}

+ 169 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java

@@ -0,0 +1,169 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestRLESparseResourceAllocation {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestRLESparseResourceAllocation.class);
+
+  @Test
+  public void testBlocks() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+        generateAllocation(start, alloc, false).entrySet();
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    Assert.assertFalse(rleSparseVector.isEmpty());
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(99));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertTrue(rleSparseVector.isEmpty());
+  }
+
+  @Test
+  public void testSteps() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    int[] alloc = { 10, 10, 10, 10, 10, 10 };
+    int start = 100;
+    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+        generateAllocation(start, alloc, true).entrySet();
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    Assert.assertFalse(rleSparseVector.isEmpty());
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(99));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertTrue(rleSparseVector.isEmpty());
+  }
+
+  @Test
+  public void testSkyline() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    int[] alloc = { 0, 5, 10, 10, 5, 0 };
+    int start = 100;
+    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+        generateAllocation(start, alloc, true).entrySet();
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    Assert.assertFalse(rleSparseVector.isEmpty());
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(99));
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(
+          Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
+    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+      rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+    }
+    LOG.info(rleSparseVector.toString());
+    for (int i = 0; i < alloc.length; i++) {
+      Assert.assertEquals(Resource.newInstance(0, 0),
+          rleSparseVector.getCapacityAtTime(start + i));
+    }
+    Assert.assertTrue(rleSparseVector.isEmpty());
+  }
+
+  @Test
+  public void testZeroAlloaction() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
+        ReservationRequest.newInstance(Resource.newInstance(0, 0), (0)));
+    LOG.info(rleSparseVector.toString());
+    Assert.assertEquals(Resource.newInstance(0, 0),
+        rleSparseVector.getCapacityAtTime(new Random().nextLong()));
+    Assert.assertTrue(rleSparseVector.isEmpty());
+  }
+
+  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+      int startTime, int[] alloc, boolean isStep) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    int numContainers = 0;
+    for (int i = 0; i < alloc.length; i++) {
+      if (isStep) {
+        numContainers = alloc[i] + i;
+      } else {
+        numContainers = alloc[i];
+      }
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+
+      ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+          (numContainers)));
+    }
+    return req;
+  }
+
+}