浏览代码

YARN-3736. Add RMStateStore apis to store and load accepted reservations for failover (adhoot via asuresh)

(cherry picked from commit f271d377357ad680924d19f07e6c8315e7c89bae)
Arun Suresh 9 年之前
父节点
当前提交
707b96fa58
共有 17 个文件被更改,包括 1027 次插入82 次删除
  1. 2 0
      hadoop-yarn-project/CHANGES.txt
  2. 162 47
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  3. 111 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
  4. 57 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  5. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
  6. 151 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  7. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
  8. 56 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java
  9. 123 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
  10. 101 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
  11. 17 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
  12. 186 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
  13. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
  14. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
  15. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
  16. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
  17. 10 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java

+ 2 - 0
hadoop-yarn-project/CHANGES.txt

@@ -101,6 +101,8 @@ Release 2.8.0 - UNRELEASED
     YARN-3853. Add docker container runtime support to LinuxContainterExecutor.
     (Sidharta Seethana via vvasudev)
 
+    YARN-3736. Add RMStateStore apis to store and load accepted reservations for
+    failover (adhoot via asuresh)
 
   IMPROVEMENTS
 

+ 162 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -44,12 +45,14 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -76,6 +79,8 @@ import com.google.common.annotations.VisibleForTesting;
  * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
  * separately. The currentMasterkey and nextMasterkey have been stored.
  * Also, AMRMToken has been removed from ApplicationAttemptState.
+ *
+ * Changes from 1.2 to 1.3, Addition of ReservationSystem state.
  */
 public class FileSystemRMStateStore extends RMStateStore {
 
@@ -83,7 +88,7 @@ public class FileSystemRMStateStore extends RMStateStore {
 
   protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
   protected static final Version CURRENT_VERSION_INFO = Version
-    .newInstance(1, 2);
+    .newInstance(1, 3);
   protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
       "AMRMTokenSecretManagerNode";
 
@@ -108,6 +113,8 @@ public class FileSystemRMStateStore extends RMStateStore {
   Path fsWorkingPath;
 
   Path amrmTokenSecretManagerRoot;
+  private Path reservationRoot;
+
   @Override
   public synchronized void initInternal(Configuration conf)
       throws Exception{
@@ -117,6 +124,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
     amrmTokenSecretManagerRoot =
         new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+    reservationRoot = new Path(rootDirPath, RESERVATION_SYSTEM_ROOT);
     fsNumRetries =
         conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
             YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
@@ -153,6 +161,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     mkdirsWithRetries(rmDTSecretManagerRoot);
     mkdirsWithRetries(rmAppRoot);
     mkdirsWithRetries(amrmTokenSecretManagerRoot);
+    mkdirsWithRetries(reservationRoot);
   }
 
   @Override
@@ -222,9 +231,24 @@ public class FileSystemRMStateStore extends RMStateStore {
     loadRMAppState(rmState);
     // recover AMRMTokenSecretManager
     loadAMRMTokenSecretManagerState(rmState);
+    // recover reservation state
+    loadReservationSystemState(rmState);
     return rmState;
   }
 
+  private void loadReservationSystemState(RMState rmState) throws Exception {
+    try {
+      final ReservationStateFileProcessor fileProcessor = new
+          ReservationStateFileProcessor(rmState);
+      final Path rootDirectory = this.reservationRoot;
+
+      processDirectoriesOfFiles(fileProcessor, rootDirectory);
+    } catch (Exception e) {
+      LOG.error("Failed to load state.", e);
+      throw e;
+    }
+  }
+
   private void loadAMRMTokenSecretManagerState(RMState rmState)
       throws Exception {
     checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
@@ -248,50 +272,12 @@ public class FileSystemRMStateStore extends RMStateStore {
 
   private void loadRMAppState(RMState rmState) throws Exception {
     try {
-      List<ApplicationAttemptStateData> attempts =
-          new ArrayList<ApplicationAttemptStateData>();
-
-      for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) {
-        checkAndResumeUpdateOperation(appDir.getPath());
-        for (FileStatus childNodeStatus :
-            listStatusWithRetries(appDir.getPath())) {
-          assert childNodeStatus.isFile();
-          String childNodeName = childNodeStatus.getPath().getName();
-          if (checkAndRemovePartialRecordWithRetries(
-              childNodeStatus.getPath())) {
-            continue;
-          }
-          byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
-                  childNodeStatus.getLen());
-          // Set attribute if not already set
-          setUnreadableBySuperuserXattrib(childNodeStatus.getPath());
-          if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
-            // application
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Loading application from node: " + childNodeName);
-            }
-            ApplicationStateDataPBImpl appState =
-                new ApplicationStateDataPBImpl(
-                  ApplicationStateDataProto.parseFrom(childData));
-            ApplicationId appId =
-                appState.getApplicationSubmissionContext().getApplicationId();
-            rmState.appState.put(appId, appState);
-          } else if (childNodeName
-            .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
-            // attempt
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Loading application attempt from node: "
-                  + childNodeName);
-            }
-            ApplicationAttemptStateDataPBImpl attemptState =
-                new ApplicationAttemptStateDataPBImpl(
-                  ApplicationAttemptStateDataProto.parseFrom(childData));
-            attempts.add(attemptState);
-          } else {
-            LOG.info("Unknown child node with name: " + childNodeName);
-          }
-        }
-      }
+      List<ApplicationAttemptStateData> attempts = new ArrayList<>();
+      final RMAppStateFileProcessor rmAppStateFileProcessor =
+          new RMAppStateFileProcessor(rmState, attempts);
+      final Path rootDirectory = this.rmAppRoot;
+
+      processDirectoriesOfFiles(rmAppStateFileProcessor, rootDirectory);
 
       // go through all attempts and add them to their apps, Ideally, each
       // attempt node must have a corresponding app node, because remove
@@ -309,6 +295,29 @@ public class FileSystemRMStateStore extends RMStateStore {
     }
   }
 
+  private void processDirectoriesOfFiles(
+      RMStateFileProcessor rmAppStateFileProcessor, Path rootDirectory)
+    throws Exception {
+    for (FileStatus dir : listStatusWithRetries(rootDirectory)) {
+      checkAndResumeUpdateOperation(dir.getPath());
+      String dirName = dir.getPath().getName();
+      for (FileStatus fileNodeStatus : listStatusWithRetries(dir.getPath())) {
+        assert fileNodeStatus.isFile();
+        String fileName = fileNodeStatus.getPath().getName();
+        if (checkAndRemovePartialRecordWithRetries(fileNodeStatus.getPath())) {
+          continue;
+        }
+        byte[] fileData = readFileWithRetries(fileNodeStatus.getPath(),
+                fileNodeStatus.getLen());
+        // Set attribute if not already set
+        setUnreadableBySuperuserXattrib(fileNodeStatus.getPath());
+
+        rmAppStateFileProcessor.processChildNode(dirName, fileName,
+            fileData);
+      }
+    }
+  }
+
   private boolean checkAndRemovePartialRecord(Path record) throws IOException {
     // If the file ends with .tmp then it shows that it failed
     // during saving state into state store. The file will be deleted as a
@@ -843,6 +852,41 @@ public class FileSystemRMStateStore extends RMStateStore {
     }
   }
 
+  @Override
+  protected void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    Path planCreatePath = getNodePath(reservationRoot, planName);
+    mkdirsWithRetries(planCreatePath);
+    Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+    LOG.info("Storing state for reservation " + reservationIdName + " from " +
+        "plan " + planName + " at path " + reservationPath);
+    byte[] reservationData = reservationAllocation.toByteArray();
+    writeFileWithRetries(reservationPath, reservationData, true);
+  }
+
+  @Override
+  protected void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    Path planCreatePath = getNodePath(reservationRoot, planName);
+    Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+    LOG.info("Updating state for reservation " + reservationIdName + " from " +
+        "plan " + planName + " at path " + reservationPath);
+    byte[] reservationData = reservationAllocation.toByteArray();
+    updateFile(reservationPath, reservationData, true);
+  }
+
+  @Override
+  protected void removeReservationState(
+      String planName, String reservationIdName) throws Exception {
+    Path planCreatePath = getNodePath(reservationRoot, planName);
+    Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+    LOG.info("Removing state for reservation " + reservationIdName + " from " +
+        "plan " + planName + " at path " + reservationPath);
+    deleteFileWithRetries(reservationPath);
+  }
+
   @VisibleForTesting
   public int getNumRetries() {
     return fsNumRetries;
@@ -853,8 +897,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     return fsRetryInterval;
   }
 
-  private void setUnreadableBySuperuserXattrib(Path p)
-          throws IOException {
+  private void setUnreadableBySuperuserXattrib(Path p) throws IOException {
     if (fs.getScheme().toLowerCase().contains("hdfs")
         && intermediateEncryptionEnabled
         && !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
@@ -862,4 +905,76 @@ public class FileSystemRMStateStore extends RMStateStore {
         EnumSet.of(XAttrSetFlag.CREATE));
     }
   }
+
+  private static class ReservationStateFileProcessor implements
+      RMStateFileProcessor {
+    private RMState rmState;
+    public ReservationStateFileProcessor(RMState state) {
+      this.rmState = state;
+    }
+
+    @Override
+    public void processChildNode(String planName, String childNodeName,
+        byte[] childData) throws IOException {
+      ReservationAllocationStateProto allocationState =
+          ReservationAllocationStateProto.parseFrom(childData);
+      if (!rmState.getReservationState().containsKey(planName)) {
+        rmState.getReservationState().put(planName,
+            new HashMap<ReservationId, ReservationAllocationStateProto>());
+      }
+      ReservationId reservationId =
+          ReservationId.parseReservationId(childNodeName);
+      rmState.getReservationState().get(planName).put(reservationId,
+          allocationState);
+    }
+  }
+
+  private static class RMAppStateFileProcessor implements RMStateFileProcessor {
+    private RMState rmState;
+    private List<ApplicationAttemptStateData> attempts;
+
+    public RMAppStateFileProcessor(RMState rmState,
+        List<ApplicationAttemptStateData> attempts) {
+      this.rmState = rmState;
+      this.attempts = attempts;
+    }
+
+    @Override
+    public void processChildNode(String appDirName, String childNodeName,
+        byte[] childData)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+        // application
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading application from node: " + childNodeName);
+        }
+        ApplicationStateDataPBImpl appState =
+            new ApplicationStateDataPBImpl(
+                ApplicationStateDataProto.parseFrom(childData));
+        ApplicationId appId =
+            appState.getApplicationSubmissionContext().getApplicationId();
+        rmState.appState.put(appId, appState);
+      } else if (childNodeName.startsWith(
+          ApplicationAttemptId.appAttemptIdStrPrefix)) {
+        // attempt
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading application attempt from node: "
+              + childNodeName);
+        }
+        ApplicationAttemptStateDataPBImpl attemptState =
+            new ApplicationAttemptStateDataPBImpl(
+                ApplicationAttemptStateDataProto.parseFrom(childData));
+        attempts.add(attemptState);
+      } else {
+        LOG.info("Unknown child node with name: " + childNodeName);
+      }
+    }
+  }
+
+  // Interface for common state processing of directory of file layout
+  private interface RMStateFileProcessor {
+    void processChildNode(String appDirName, String childNodeName,
+        byte[] childData)
+        throws IOException;
+  }
 }

+ 111 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java

@@ -27,6 +27,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -39,6 +40,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -69,6 +72,9 @@ import org.iq80.leveldb.WriteBatch;
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Changes from 1.0 to 1.1, Addition of ReservationSystem state.
+ */
 public class LeveldbRMStateStore extends RMStateStore {
 
   public static final Log LOG =
@@ -84,9 +90,11 @@ public class LeveldbRMStateStore extends RMStateStore {
       RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber";
   private static final String RM_APP_KEY_PREFIX =
       RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix;
+  private static final String RM_RESERVATION_KEY_PREFIX =
+      RESERVATION_SYSTEM_ROOT + SEPARATOR;
 
   private static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(1, 0);
+      .newInstance(1, 1);
 
   private DB db;
 
@@ -112,6 +120,12 @@ public class LeveldbRMStateStore extends RMStateStore {
     return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber();
   }
 
+  private String getReservationNodeKey(String planName,
+      String reservationId) {
+    return RESERVATION_SYSTEM_ROOT + SEPARATOR + planName + SEPARATOR
+        + reservationId;
+  }
+
   @Override
   protected void initInternal(Configuration conf) throws Exception {
   }
@@ -230,9 +244,51 @@ public class LeveldbRMStateStore extends RMStateStore {
      loadRMDTSecretManagerState(rmState);
      loadRMApps(rmState);
      loadAMRMTokenSecretManagerState(rmState);
+    loadReservationState(rmState);
     return rmState;
    }
 
+  private void loadReservationState(RMState rmState) throws IOException {
+    int numReservations = 0;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(RM_RESERVATION_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+
+        String planReservationString =
+            key.substring(RM_RESERVATION_KEY_PREFIX.length());
+        String[] parts = planReservationString.split(SEPARATOR);
+        if (parts.length != 2) {
+          LOG.warn("Incorrect reservation state key " + key);
+          continue;
+        }
+        String planName = parts[0];
+        String reservationName = parts[1];
+        ReservationAllocationStateProto allocationState =
+            ReservationAllocationStateProto.parseFrom(entry.getValue());
+        if (!rmState.getReservationState().containsKey(planName)) {
+          rmState.getReservationState().put(planName,
+              new HashMap<ReservationId, ReservationAllocationStateProto>());
+        }
+        ReservationId reservationId =
+            ReservationId.parseReservationId(reservationName);
+        rmState.getReservationState().get(planName).put(reservationId,
+            allocationState);
+        numReservations++;
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    LOG.info("Recovered " + numReservations + " reservations");
+  }
+
   private void loadRMDTSecretManagerState(RMState state) throws IOException {
     int numKeys = loadRMDTSecretManagerKeys(state);
     LOG.info("Recovered " + numKeys + " RM delegation token master keys");
@@ -544,7 +600,59 @@ public class LeveldbRMStateStore extends RMStateStore {
       throw new IOException(e);
     }
   }
-  
+
+  @Override
+  protected void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        String key = getReservationNodeKey(planName, reservationIdName);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Storing state for reservation " + reservationIdName
+              + " plan " + planName + " at " + key);
+        }
+        batch.put(bytes(key), reservationAllocation.toByteArray());
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    storeReservationState(reservationAllocation, planName,
+        reservationIdName);
+  }
+
+  @Override
+  protected void removeReservationState(String planName,
+      String reservationIdName) throws Exception {
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        String reservationKey =
+            getReservationNodeKey(planName, reservationIdName);
+        batch.delete(bytes(reservationKey));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removing state for reservation " + reservationIdName
+              + " plan " + planName + " at " + reservationKey);
+        }
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
   private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId,
       Long renewDate, boolean isUpdate) throws IOException {
     String tokenKey = getRMDTTokenNodeKey(tokenId);
@@ -679,7 +787,7 @@ public class LeveldbRMStateStore extends RMStateStore {
       iter = new LeveldbIterator(db);
       iter.seekToFirst();
       while (iter.hasNext()) {
-        Entry<byte[],byte[]> entry = iter.next();
+        Entry<byte[], byte[]> entry = iter.next();
         LOG.info("entry: " + asString(entry.getKey()));
         ++numEntries;
       }

+ 57 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -28,7 +29,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
@@ -223,6 +226,60 @@ public class MemoryRMStateStore extends RMStateStore {
     rmDTMasterKeyState.remove(delegationKey);
   }
 
+  @Override
+  protected synchronized void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    LOG.info("Storing reservationallocation for " + reservationIdName + " " +
+            "for plan " + planName);
+    Map<ReservationId, ReservationAllocationStateProto> planState =
+        state.getReservationState().get(planName);
+    if (planState == null) {
+      planState = new HashMap<>();
+      state.getReservationState().put(planName, planState);
+    }
+    ReservationId reservationId =
+        ReservationId.parseReservationId(reservationIdName);
+    planState.put(reservationId, reservationAllocation);
+  }
+
+  @Override
+  protected synchronized void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    LOG.info("Updating reservationallocation for " + reservationIdName + " " +
+            "for plan " + planName);
+    Map<ReservationId, ReservationAllocationStateProto> planState =
+        state.getReservationState().get(planName);
+    if (planState == null) {
+      throw new YarnRuntimeException("State for plan " + planName + " does " +
+          "not exist");
+    }
+    ReservationId reservationId =
+        ReservationId.parseReservationId(reservationIdName);
+    planState.put(reservationId, reservationAllocation);
+  }
+
+  @Override
+  protected synchronized void removeReservationState(
+      String planName, String reservationIdName) throws Exception {
+    LOG.info("Removing reservationallocation " + reservationIdName
+              + " for plan " + planName);
+
+    Map<ReservationId, ReservationAllocationStateProto> planState =
+        state.getReservationState().get(planName);
+    if (planState == null) {
+      throw new YarnRuntimeException("State for plan " + planName + " does " +
+          "not exist");
+    }
+    ReservationId reservationId =
+        ReservationId.parseReservationId(reservationIdName);
+    planState.remove(reservationId);
+    if (planState.isEmpty()) {
+      state.getReservationState().remove(planName);
+    }
+  }
+
   @Override
   protected Version loadVersion() throws Exception {
     return null;

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
@@ -101,6 +102,26 @@ public class NullRMStateStore extends RMStateStore {
     // Do nothing
   }
 
+  @Override
+  protected void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected void removeReservationState(String planName,
+      String reservationIdName) throws Exception {
+      // Do nothing
+  }
+
+  @Override
+  protected void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+      // Do nothing
+  }
+
   @Override
   public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
     // Do nothing
@@ -155,4 +176,6 @@ public class NullRMStateStore extends RMStateStore {
     // Do nothing
   }
 
+
+
 }

+ 151 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -43,11 +43,13 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
@@ -87,6 +89,8 @@ public abstract class RMStateStore extends AbstractService {
       "RMDTSequenceNumber_";
   protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
       "AMRMTokenSecretManagerRoot";
+  protected static final String RESERVATION_SYSTEM_ROOT =
+      "ReservationSystemRoot";
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String EPOCH_NODE = "EpochNode";
   private ResourceManager resourceManager;
@@ -136,7 +140,16 @@ public abstract class RMStateStore extends AbstractService {
               new UpdateRMDTTransition())
        .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
            RMStateStoreEventType.UPDATE_AMRM_TOKEN,
-              new StoreOrUpdateAMRMTokenTransition())
+           new StoreOrUpdateAMRMTokenTransition())
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          RMStateStoreEventType.STORE_RESERVATION,
+          new StoreReservationAllocationTransition())
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          RMStateStoreEventType.UPDATE_RESERVATION,
+          new UpdateReservationAllocationTransition())
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          RMStateStoreEventType.REMOVE_RESERVATION,
+          new RemoveReservationAllocationTransition())
       .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
           RMStateStoreEventType.FENCED)
       .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
@@ -152,7 +165,10 @@ public abstract class RMStateStore extends AbstractService {
           RMStateStoreEventType.STORE_DELEGATION_TOKEN,
           RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
           RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
-          RMStateStoreEventType.UPDATE_AMRM_TOKEN));
+          RMStateStoreEventType.UPDATE_AMRM_TOKEN,
+          RMStateStoreEventType.STORE_RESERVATION,
+          RMStateStoreEventType.UPDATE_RESERVATION,
+          RMStateStoreEventType.REMOVE_RESERVATION));
 
   private final StateMachine<RMStateStoreState,
                              RMStateStoreEventType,
@@ -415,6 +431,80 @@ public abstract class RMStateStore extends AbstractService {
     }
   }
 
+  private static class StoreReservationAllocationTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      RMStateStoreStoreReservationEvent reservationEvent =
+          (RMStateStoreStoreReservationEvent) event;
+      try {
+        LOG.info("Storing reservation allocation." + reservationEvent
+                .getReservationIdName());
+        store.storeReservationState(
+            reservationEvent.getReservationAllocation(),
+            reservationEvent.getPlanName(),
+            reservationEvent.getReservationIdName());
+      } catch (Exception e) {
+        LOG.error("Error while storing reservation allocation.", e);
+        store.notifyStoreOperationFailed(e);
+      }
+    }
+  }
+
+  private static class UpdateReservationAllocationTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      RMStateStoreStoreReservationEvent reservationEvent =
+          (RMStateStoreStoreReservationEvent) event;
+      try {
+        LOG.info("Updating reservation allocation." + reservationEvent
+                .getReservationIdName());
+        store.updateReservationState(
+            reservationEvent.getReservationAllocation(),
+            reservationEvent.getPlanName(),
+            reservationEvent.getReservationIdName());
+      } catch (Exception e) {
+        LOG.error("Error while updating reservation allocation.", e);
+        store.notifyStoreOperationFailed(e);
+      }
+    }
+  }
+
+  private static class RemoveReservationAllocationTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      RMStateStoreStoreReservationEvent reservationEvent =
+          (RMStateStoreStoreReservationEvent) event;
+      try {
+        LOG.info("Removing reservation allocation." + reservationEvent
+                .getReservationIdName());
+        store.removeReservationState(
+            reservationEvent.getPlanName(),
+            reservationEvent.getReservationIdName());
+      } catch (Exception e) {
+        LOG.error("Error while removing reservation allocation.", e);
+        store.notifyStoreOperationFailed(e);
+      }
+    }
+  }
+
   public RMStateStore() {
     super(RMStateStore.class.getName());
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -457,6 +547,9 @@ public abstract class RMStateStore extends AbstractService {
 
     AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
 
+    private Map<String, Map<ReservationId, ReservationAllocationStateProto>>
+        reservationState = new TreeMap<>();
+
     public Map<ApplicationId, ApplicationStateData> getApplicationState() {
       return appState;
     }
@@ -468,6 +561,11 @@ public abstract class RMStateStore extends AbstractService {
     public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
       return amrmTokenSecretManagerState;
     }
+
+    public Map<String, Map<ReservationId, ReservationAllocationStateProto>>
+        getReservationState() {
+      return reservationState;
+    }
   }
     
   private Dispatcher rmDispatcher;
@@ -745,6 +843,57 @@ public abstract class RMStateStore extends AbstractService {
         RMStateStoreEventType.REMOVE_MASTERKEY));
   }
 
+  /**
+   * Blocking Apis to maintain reservation state.
+   */
+  public void storeNewReservation(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) {
+    handleStoreEvent(new RMStateStoreStoreReservationEvent(
+        reservationAllocation, RMStateStoreEventType.STORE_RESERVATION,
+        planName, reservationIdName));
+  }
+
+  public void updateReservation(
+      ReservationAllocationStateProto reservationAllocation,
+      String planName, String reservationIdName) {
+    handleStoreEvent(new RMStateStoreStoreReservationEvent(
+        reservationAllocation, RMStateStoreEventType.UPDATE_RESERVATION,
+        planName, reservationIdName));
+  }
+
+  public void removeReservation(String planName, String reservationIdName) {
+    handleStoreEvent(new RMStateStoreStoreReservationEvent(
+            null, RMStateStoreEventType.REMOVE_RESERVATION,
+            planName, reservationIdName));
+  }
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to store the state of
+   * a reservation allocation.
+   */
+  protected abstract void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception;
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to remove the state of
+   * a reservation allocation.
+   */
+  protected abstract void removeReservationState(String planName,
+      String reservationIdName) throws Exception;
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to update the state of
+   * a reservation allocation.
+   */
+  protected abstract void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception;
+
   /**
    * Blocking API
    * Derived classes must implement this method to remove the state of

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java

@@ -32,5 +32,8 @@ public enum RMStateStoreEventType {
   STORE_DELEGATION_TOKEN,
   REMOVE_DELEGATION_TOKEN,
   UPDATE_DELEGATION_TOKEN,
-  UPDATE_AMRM_TOKEN
+  UPDATE_AMRM_TOKEN,
+  STORE_RESERVATION,
+  UPDATE_RESERVATION,
+  REMOVE_RESERVATION,
 }

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+
+/**
+ * Event representing maintaining ReservationSystem state.
+ */
+public class RMStateStoreStoreReservationEvent extends RMStateStoreEvent {
+
+  private ReservationAllocationStateProto reservationAllocation;
+  private String planName;
+  private String reservationIdName;
+
+  public RMStateStoreStoreReservationEvent(RMStateStoreEventType type) {
+    super(type);
+  }
+
+  public RMStateStoreStoreReservationEvent(
+      ReservationAllocationStateProto reservationAllocationState,
+      RMStateStoreEventType type, String planName, String reservationIdName) {
+    this(type);
+    this.reservationAllocation = reservationAllocationState;
+    this.planName = planName;
+    this.reservationIdName = reservationIdName;
+  }
+
+  public ReservationAllocationStateProto getReservationAllocation() {
+    return reservationAllocation;
+  }
+
+  public String getPlanName() {
+    return planName;
+  }
+
+  public String getReservationIdName() {
+    return reservationIdName;
+  }
+}

+ 123 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -26,6 +26,7 @@ import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
@@ -45,6 +46,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -107,9 +110,18 @@ import com.google.common.annotations.VisibleForTesting;
  *        |----- currentMasterKey
  *        |----- nextMasterKey
  *
+ * |-- RESERVATION_SYSTEM_ROOT
+ *        |------PLAN_1
+ *        |      |------ RESERVATION_1
+ *        |      |------ RESERVATION_2
+ *        |      ....
+ *        |------PLAN_2
+ *        ....
  * Note: Changes from 1.1 to 1.2 - AMRMTokenSecretManager state has been saved
  * separately. The currentMasterkey and nextMasterkey have been stored.
  * Also, AMRMToken has been removed from ApplicationAttemptState.
+ *
+ * Changes from 1.2 to 1.3, Addition of ReservationSystem state.
  */
 @Private
 @Unstable
@@ -120,7 +132,7 @@ public class ZKRMStateStore extends RMStateStore {
 
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
   protected static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(1, 2);
+      .newInstance(1, 3);
   private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
       "RMDelegationTokensRoot";
   private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -142,6 +154,7 @@ public class ZKRMStateStore extends RMStateStore {
   private String delegationTokensRootPath;
   private String dtSequenceNumberPath;
   private String amrmTokenSecretManagerRoot;
+  private String reservationRoot;
   @VisibleForTesting
   protected String znodeWorkingPath;
 
@@ -258,6 +271,7 @@ public class ZKRMStateStore extends RMStateStore {
         RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
     amrmTokenSecretManagerRoot =
         getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+    reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
   }
 
   @Override
@@ -279,6 +293,7 @@ public class ZKRMStateStore extends RMStateStore {
     create(delegationTokensRootPath);
     create(dtSequenceNumberPath);
     create(amrmTokenSecretManagerRoot);
+    create(reservationRoot);
   }
 
   private void logRootNodeAcls(String prefix) throws Exception {
@@ -375,9 +390,41 @@ public class ZKRMStateStore extends RMStateStore {
     loadRMAppState(rmState);
     // recover AMRMTokenSecretManager
     loadAMRMTokenSecretManagerState(rmState);
+    // recover reservation state
+    loadReservationSystemState(rmState);
     return rmState;
   }
 
+  private void loadReservationSystemState(RMState rmState) throws Exception {
+    List<String> planNodes = getChildren(reservationRoot);
+    for (String planName : planNodes) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Loading plan from znode: " + planName);
+      }
+      String planNodePath = getNodePath(reservationRoot, planName);
+
+      List<String> reservationNodes = getChildren(planNodePath);
+      for (String reservationNodeName : reservationNodes) {
+        String reservationNodePath = getNodePath(planNodePath,
+            reservationNodeName);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading reservation from znode: " + reservationNodePath);
+        }
+        byte[] reservationData = getData(reservationNodePath);
+        ReservationAllocationStateProto allocationState =
+            ReservationAllocationStateProto.parseFrom(reservationData);
+        if (!rmState.getReservationState().containsKey(planName)) {
+          rmState.getReservationState().put(planName,
+              new HashMap<ReservationId, ReservationAllocationStateProto>());
+        }
+        ReservationId reservationId =
+            ReservationId.parseReservationId(reservationNodeName);
+        rmState.getReservationState().get(planName).put(reservationId,
+            allocationState);
+      }
+    }
+  }
+
   private void loadAMRMTokenSecretManagerState(RMState rmState)
       throws Exception {
     byte[] data = getData(amrmTokenSecretManagerRoot);
@@ -763,6 +810,81 @@ public class ZKRMStateStore extends RMStateStore {
     safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
   }
 
+  @Override
+  protected synchronized void removeReservationState(String planName,
+      String reservationIdName)
+      throws Exception {
+    String planNodePath =
+        getNodePath(reservationRoot, planName);
+    String reservationPath = getNodePath(planNodePath,
+        reservationIdName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing reservationallocation " + reservationIdName + " for" +
+          " plan " + planName);
+    }
+    safeDelete(reservationPath);
+
+    List<String> reservationNodes = getChildren(planNodePath);
+    if (reservationNodes.isEmpty()) {
+      safeDelete(planNodePath);
+    }
+  }
+
+  @Override
+  protected synchronized void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName)
+      throws Exception {
+    SafeTransaction trx = new SafeTransaction();
+    addOrUpdateReservationState(
+        reservationAllocation, planName, reservationIdName, trx, false);
+    trx.commit();
+  }
+
+  @Override
+  protected synchronized void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName)
+      throws Exception {
+    SafeTransaction trx = new SafeTransaction();
+    addOrUpdateReservationState(
+        reservationAllocation, planName, reservationIdName, trx, true);
+    trx.commit();
+  }
+
+  private void addOrUpdateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName, SafeTransaction trx, boolean isUpdate)
+      throws Exception {
+    String planCreatePath =
+        getNodePath(reservationRoot, planName);
+    String reservationPath = getNodePath(planCreatePath,
+        reservationIdName);
+    byte[] reservationData = reservationAllocation.toByteArray();
+
+    if (!exists(planCreatePath)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath);
+      }
+      trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT);
+    }
+
+    if (isUpdate) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updating reservation: " + reservationIdName + " in plan:"
+            + planName + " at: " + reservationPath);
+      }
+      trx.setData(reservationPath, reservationData, -1);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing reservation: " + reservationIdName + " in plan:"
+            + planName + " at: " + reservationPath);
+      }
+      trx.create(reservationPath, reservationData, zkAcl,
+          CreateMode.PERSISTENT);
+    }
+  }
+
   /**
    * Utility function to ensure that the configured base znode exists.
    * This recursively creates the znode as well as all of its parents.

+ 101 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java

@@ -18,11 +18,23 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnProtos.ReservationDefinitionProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ResourceAllocationRequestProto;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -52,4 +64,92 @@ public final class ReservationSystemUtil {
     }
     return resources;
   }
+
+  public static ReservationAllocationStateProto buildStateProto(
+      ReservationAllocation allocation) {
+    ReservationAllocationStateProto.Builder builder =
+        ReservationAllocationStateProto.newBuilder();
+
+    builder.setAcceptanceTimestamp(allocation.getAcceptanceTime());
+    builder.setContainsGangs(allocation.containsGangs());
+    builder.setStartTime(allocation.getStartTime());
+    builder.setEndTime(allocation.getEndTime());
+    builder.setUser(allocation.getUser());
+    ReservationDefinitionProto definitionProto = convertToProtoFormat(
+        allocation.getReservationDefinition());
+    builder.setReservationDefinition(definitionProto);
+
+    for (Map.Entry<ReservationInterval, Resource> entry :
+        allocation.getAllocationRequests().entrySet()) {
+      ResourceAllocationRequestProto p =
+          ResourceAllocationRequestProto.newBuilder()
+          .setStartTime(entry.getKey().getStartTime())
+          .setEndTime(entry.getKey().getEndTime())
+          .setResource(convertToProtoFormat(entry.getValue()))
+          .build();
+      builder.addAllocationRequests(p);
+    }
+
+    ReservationAllocationStateProto allocationProto = builder.build();
+    return allocationProto;
+  }
+
+  private static ReservationDefinitionProto convertToProtoFormat(
+      ReservationDefinition reservationDefinition) {
+    return ((ReservationDefinitionPBImpl)reservationDefinition).getProto();
+  }
+
+  public static ResourceProto convertToProtoFormat(Resource e) {
+    return YarnProtos.ResourceProto.newBuilder()
+        .setMemory(e.getMemory())
+        .setVirtualCores(e.getVirtualCores())
+        .build();
+  }
+
+  public static Map<ReservationInterval, Resource> toAllocations(
+      List<ResourceAllocationRequestProto> allocationRequestsList) {
+    Map<ReservationInterval, Resource> allocations = new HashMap<>();
+    for (ResourceAllocationRequestProto proto : allocationRequestsList) {
+      allocations.put(
+          new ReservationInterval(proto.getStartTime(), proto.getEndTime()),
+          convertFromProtoFormat(proto.getResource()));
+    }
+    return allocations;
+  }
+
+  private static ResourcePBImpl convertFromProtoFormat(ResourceProto resource) {
+    return new ResourcePBImpl(resource);
+  }
+
+  public static ReservationDefinitionPBImpl convertFromProtoFormat(
+      ReservationDefinitionProto r) {
+    return new ReservationDefinitionPBImpl(r);
+  }
+
+  public static ReservationIdPBImpl convertFromProtoFormat(
+      ReservationIdProto r) {
+    return new ReservationIdPBImpl(r);
+  }
+
+  public static ReservationId toReservationId(
+      ReservationIdProto reservationId) {
+    return new ReservationIdPBImpl(reservationId);
+  }
+
+  public static InMemoryReservationAllocation toInMemoryAllocation(
+      String planName, ReservationId reservationId,
+      ReservationAllocationStateProto allocationState, Resource minAlloc,
+      ResourceCalculator planResourceCalculator) {
+    ReservationDefinition definition =
+        convertFromProtoFormat(
+            allocationState.getReservationDefinition());
+    Map<ReservationInterval, Resource> allocations = toAllocations(
+            allocationState.getAllocationRequestsList());
+    InMemoryReservationAllocation allocation =
+        new InMemoryReservationAllocation(reservationId, definition,
+        allocationState.getUser(), planName, allocationState.getStartTime(),
+        allocationState.getEndTime(), allocations, planResourceCalculator,
+        minAlloc, allocationState.getContainsGangs());
+    return allocation;
+  }
 }

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto

@@ -96,4 +96,20 @@ message AMRMTokenSecretManagerStateProto {
 message RMDelegationTokenIdentifierDataProto {
   optional YARNDelegationTokenIdentifierProto token_identifier = 1;
   optional int64 renewDate = 2;
-}
+}
+
+message ResourceAllocationRequestProto {
+  optional int64 start_time = 1;
+  optional int64 end_time = 2;
+  optional ResourceProto resource = 3;
+}
+
+message ReservationAllocationStateProto {
+  optional ReservationDefinitionProto reservation_definition = 1;
+  repeated ResourceAllocationRequestProto allocation_requests = 2;
+  optional int64 start_time = 3;
+  optional int64 end_time = 4;
+  optional string user = 5;
+  optional bool contains_gangs = 6;
+  optional int64 acceptance_timestamp = 7;
+}

+ 186 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -36,6 +39,11 @@ import java.util.Map;
 
 import javax.crypto.SecretKey;
 
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -63,6 +71,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
@@ -691,4 +701,180 @@ public class RMStateStoreTestBase {
 
     store.close();
   }
+
+  public void testReservationStateStore(
+      RMStateStoreHelper stateStoreHelper) throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getStateStore()).thenReturn(store);
+
+    long ts = System.currentTimeMillis();
+    ReservationId r1 = ReservationId.newInstance(ts, 1);
+    int start = 1;
+    int[] alloc = { 10, 10, 10, 10, 10 };
+    ResourceCalculator res = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1024, 1);
+    boolean hasGang = true;
+    String planName = "dedicated";
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1, alloc.length);
+    ReservationAllocation allocation = new InMemoryReservationAllocation(
+        r1, rDef, "u3", planName, 0, 0 + alloc.length,
+        ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res,
+        minAlloc, hasGang);
+    ReservationAllocationStateProto allocationStateProto =
+        ReservationSystemUtil.buildStateProto(allocation);
+    assertAllocationStateEqual(allocation, allocationStateProto);
+
+    // 1. Load empty store and verify no errors
+    store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
+    store.setRMDispatcher(dispatcher);
+    RMState state = store.loadState();
+    Map<String, Map<ReservationId, ReservationAllocationStateProto>>
+      reservationState = state.getReservationState();
+    Assert.assertNotNull(reservationState);
+
+    // 2. Store single reservation and verify
+    String reservationIdName = r1.toString();
+    rmContext.getStateStore().storeNewReservation(
+        allocationStateProto,
+        planName, reservationIdName);
+
+
+    // load state and verify new state
+    validateStoredReservation(
+        stateStoreHelper, dispatcher, rmContext, r1, planName, allocation,
+        allocationStateProto);
+
+    // 3. update state test
+    alloc = new int[]{6, 6, 6};
+    hasGang = false;
+    allocation = new InMemoryReservationAllocation(
+        r1, rDef, "u3", planName, 2, 2 + alloc.length,
+        ReservationSystemTestUtil.generateAllocation(1L, 2L, alloc), res,
+        minAlloc, hasGang);
+    allocationStateProto =
+        ReservationSystemUtil.buildStateProto(allocation);
+    rmContext.getStateStore().updateReservation(
+        allocationStateProto,
+        planName, reservationIdName);
+
+    // load state and verify updated reservation
+    validateStoredReservation(
+        stateStoreHelper, dispatcher, rmContext, r1, planName, allocation,
+        allocationStateProto);
+
+    // 4. add a second one and remove the first one
+    ReservationId r2 = ReservationId.newInstance(ts, 2);
+    ReservationAllocation allocation2 = new InMemoryReservationAllocation(
+        r2, rDef, "u3", planName, 0, 0 + alloc.length,
+        ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res,
+        minAlloc, hasGang);
+    ReservationAllocationStateProto allocationStateProto2 =
+        ReservationSystemUtil.buildStateProto(allocation2);
+    String reservationIdName2 = r2.toString();
+
+    rmContext.getStateStore().storeNewReservation(
+        allocationStateProto2,
+        planName, reservationIdName2);
+    rmContext.getStateStore().removeReservation(planName, reservationIdName);
+
+    // load state and verify r1 is removed and r2 is still there
+    Map<ReservationId, ReservationAllocationStateProto> reservations;
+
+    store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
+    store.setRMDispatcher(dispatcher);
+    state = store.loadState();
+    reservationState = state.getReservationState();
+    Assert.assertNotNull(reservationState);
+    reservations = reservationState.get(planName);
+    Assert.assertNotNull(reservations);
+    ReservationAllocationStateProto storedReservationAllocation =
+        reservations.get(r1);
+    Assert.assertNull("Removed reservation should not be available in store",
+        storedReservationAllocation);
+
+    storedReservationAllocation = reservations.get(r2);
+    assertAllocationStateEqual(
+        allocationStateProto2, storedReservationAllocation);
+    assertAllocationStateEqual(allocation2, storedReservationAllocation);
+
+
+    // 5. remove last reservation removes the plan state
+    rmContext.getStateStore().removeReservation(planName, reservationIdName2);
+
+    store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
+    store.setRMDispatcher(dispatcher);
+    state = store.loadState();
+    reservationState = state.getReservationState();
+    Assert.assertNotNull(reservationState);
+    reservations = reservationState.get(planName);
+    Assert.assertNull(reservations);
+  }
+
+  private void validateStoredReservation(
+      RMStateStoreHelper stateStoreHelper, TestDispatcher dispatcher,
+      RMContext rmContext, ReservationId r1, String planName,
+      ReservationAllocation allocation,
+      ReservationAllocationStateProto allocationStateProto) throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
+    store.setRMDispatcher(dispatcher);
+    RMState state = store.loadState();
+    Map<String, Map<ReservationId, ReservationAllocationStateProto>>
+        reservationState = state.getReservationState();
+    Assert.assertNotNull(reservationState);
+    Map<ReservationId, ReservationAllocationStateProto> reservations =
+        reservationState.get(planName);
+    Assert.assertNotNull(reservations);
+    ReservationAllocationStateProto storedReservationAllocation =
+        reservations.get(r1);
+    Assert.assertNotNull(storedReservationAllocation);
+
+    assertAllocationStateEqual(
+        allocationStateProto, storedReservationAllocation);
+    assertAllocationStateEqual(allocation, storedReservationAllocation);
+  }
+
+  void assertAllocationStateEqual(
+      ReservationAllocationStateProto expected,
+      ReservationAllocationStateProto actual) {
+
+    Assert.assertEquals(
+        expected.getAcceptanceTimestamp(), actual.getAcceptanceTimestamp());
+    Assert.assertEquals(expected.getStartTime(), actual.getStartTime());
+    Assert.assertEquals(expected.getEndTime(), actual.getEndTime());
+    Assert.assertEquals(expected.getContainsGangs(), actual.getContainsGangs());
+    Assert.assertEquals(expected.getUser(), actual.getUser());
+    assertEquals(
+        expected.getReservationDefinition(), actual.getReservationDefinition());
+    assertEquals(expected.getAllocationRequestsList(),
+        actual.getAllocationRequestsList());
+  }
+
+  void assertAllocationStateEqual(
+      ReservationAllocation expected,
+      ReservationAllocationStateProto actual) {
+    Assert.assertEquals(
+        expected.getAcceptanceTime(), actual.getAcceptanceTimestamp());
+    Assert.assertEquals(expected.getStartTime(), actual.getStartTime());
+    Assert.assertEquals(expected.getEndTime(), actual.getEndTime());
+    Assert.assertEquals(expected.containsGangs(), actual.getContainsGangs());
+    Assert.assertEquals(expected.getUser(), actual.getUser());
+    assertEquals(
+        expected.getReservationDefinition(),
+        ReservationSystemUtil.convertFromProtoFormat(
+            actual.getReservationDefinition()));
+    assertEquals(
+        expected.getAllocationRequests(),
+        ReservationSystemUtil.toAllocations(
+            actual.getAllocationRequestsList()));
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java

@@ -186,6 +186,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       testDeleteStore(fsTester);
       testRemoveApplication(fsTester);
       testAMRMTokenSecretManagerStateStore(fsTester);
+      testReservationStateStore(fsTester);
     } finally {
       cluster.shutdown();
     }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java

@@ -102,6 +102,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
     testAMRMTokenSecretManagerStateStore(tester);
   }
 
+  @Test(timeout = 60000)
+  public void testReservation() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testReservationStateStore(tester);
+  }
+
   class LeveldbStateStoreTester implements RMStateStoreHelper {
 
     @Override

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -174,6 +174,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     testDeleteStore(zkTester);
     testRemoveApplication(zkTester);
     testAMRMTokenSecretManagerStateStore(zkTester);
+    testReservationStateStore(zkTester);
     ((TestZKRMStateStoreTester.TestZKRMStateStoreInternal)
         zkTester.getRMStateStore()).testRetryingCreateRootDir();
   }

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java

@@ -185,6 +185,22 @@ public class ReservationSystemTestUtil {
     return scheduler;
   }
 
+  public static ReservationDefinition createSimpleReservationDefinition(
+      long arrival, long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
+            duration);
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    rDef.setReservationRequests(reqs);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    return rDef;
+  }
+
   @SuppressWarnings("unchecked")
   public CapacityScheduler mockCapacityScheduler(int numContainers)
       throws IOException {

+ 10 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java

@@ -17,7 +17,6 @@
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -25,11 +24,7 @@ import java.util.Random;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.After;
@@ -67,7 +62,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, false, false);
@@ -89,7 +85,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, true, false);
@@ -112,7 +109,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = { 0, 5, 10, 10, 5, 0 };
     int start = 100;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, true, false);
@@ -135,7 +133,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = {};
     long start = 0;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     Map<ReservationInterval, Resource> allocations =
         new HashMap<ReservationInterval, Resource>();
@@ -154,7 +153,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     boolean isGang = true;
     Map<ReservationInterval, Resource> allocations =
@@ -184,22 +184,6 @@ public class TestInMemoryReservationAllocation {
     Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime());
   }
 
-  private ReservationDefinition createSimpleReservationDefinition(long arrival,
-      long deadline, long duration) {
-    // create a request with a single atomic ask
-    ReservationRequest r =
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
-            duration);
-    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setReservationResources(Collections.singletonList(r));
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
-    rDef.setReservationRequests(reqs);
-    rDef.setArrival(arrival);
-    rDef.setDeadline(deadline);
-    return rDef;
-  }
-
   private Map<ReservationInterval, Resource> generateAllocation(
       int startTime, int[] alloc, boolean isStep, boolean isGang) {
     Map<ReservationInterval, Resource> req =