Quellcode durchsuchen

YARN-3736. Add RMStateStore apis to store and load accepted reservations for failover (adhoot via asuresh)

Arun Suresh vor 10 Jahren
Ursprung
Commit
f271d37735
17 geänderte Dateien mit 1027 neuen und 82 gelöschten Zeilen
  1. 2 0
      hadoop-yarn-project/CHANGES.txt
  2. 162 47
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  3. 111 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
  4. 57 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  5. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
  6. 151 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  7. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
  8. 56 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java
  9. 123 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
  10. 101 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
  11. 17 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
  12. 186 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
  13. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
  14. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
  15. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
  16. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
  17. 10 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java

+ 2 - 0
hadoop-yarn-project/CHANGES.txt

@@ -156,6 +156,8 @@ Release 2.8.0 - UNRELEASED
     YARN-3853. Add docker container runtime support to LinuxContainterExecutor.
     (Sidharta Seethana via vvasudev)
 
+    YARN-3736. Add RMStateStore apis to store and load accepted reservations for
+    failover (adhoot via asuresh)
 
   IMPROVEMENTS
 

+ 162 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -44,12 +45,14 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -76,6 +79,8 @@ import com.google.common.annotations.VisibleForTesting;
  * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
  * separately. The currentMasterkey and nextMasterkey have been stored.
  * Also, AMRMToken has been removed from ApplicationAttemptState.
+ *
+ * Changes from 1.2 to 1.3, Addition of ReservationSystem state.
  */
 public class FileSystemRMStateStore extends RMStateStore {
 
@@ -83,7 +88,7 @@ public class FileSystemRMStateStore extends RMStateStore {
 
   protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
   protected static final Version CURRENT_VERSION_INFO = Version
-    .newInstance(1, 2);
+    .newInstance(1, 3);
   protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
       "AMRMTokenSecretManagerNode";
 
@@ -108,6 +113,8 @@ public class FileSystemRMStateStore extends RMStateStore {
   Path fsWorkingPath;
 
   Path amrmTokenSecretManagerRoot;
+  private Path reservationRoot;
+
   @Override
   public synchronized void initInternal(Configuration conf)
       throws Exception{
@@ -117,6 +124,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
     amrmTokenSecretManagerRoot =
         new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+    reservationRoot = new Path(rootDirPath, RESERVATION_SYSTEM_ROOT);
     fsNumRetries =
         conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
             YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
@@ -153,6 +161,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     mkdirsWithRetries(rmDTSecretManagerRoot);
     mkdirsWithRetries(rmAppRoot);
     mkdirsWithRetries(amrmTokenSecretManagerRoot);
+    mkdirsWithRetries(reservationRoot);
   }
 
   @Override
@@ -222,9 +231,24 @@ public class FileSystemRMStateStore extends RMStateStore {
     loadRMAppState(rmState);
     // recover AMRMTokenSecretManager
     loadAMRMTokenSecretManagerState(rmState);
+    // recover reservation state
+    loadReservationSystemState(rmState);
     return rmState;
   }
 
+  private void loadReservationSystemState(RMState rmState) throws Exception {
+    try {
+      final ReservationStateFileProcessor fileProcessor = new
+          ReservationStateFileProcessor(rmState);
+      final Path rootDirectory = this.reservationRoot;
+
+      processDirectoriesOfFiles(fileProcessor, rootDirectory);
+    } catch (Exception e) {
+      LOG.error("Failed to load state.", e);
+      throw e;
+    }
+  }
+
   private void loadAMRMTokenSecretManagerState(RMState rmState)
       throws Exception {
     checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
@@ -248,50 +272,12 @@ public class FileSystemRMStateStore extends RMStateStore {
 
   private void loadRMAppState(RMState rmState) throws Exception {
     try {
-      List<ApplicationAttemptStateData> attempts =
-          new ArrayList<ApplicationAttemptStateData>();
-
-      for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) {
-        checkAndResumeUpdateOperation(appDir.getPath());
-        for (FileStatus childNodeStatus :
-            listStatusWithRetries(appDir.getPath())) {
-          assert childNodeStatus.isFile();
-          String childNodeName = childNodeStatus.getPath().getName();
-          if (checkAndRemovePartialRecordWithRetries(
-              childNodeStatus.getPath())) {
-            continue;
-          }
-          byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
-                  childNodeStatus.getLen());
-          // Set attribute if not already set
-          setUnreadableBySuperuserXattrib(childNodeStatus.getPath());
-          if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
-            // application
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Loading application from node: " + childNodeName);
-            }
-            ApplicationStateDataPBImpl appState =
-                new ApplicationStateDataPBImpl(
-                  ApplicationStateDataProto.parseFrom(childData));
-            ApplicationId appId =
-                appState.getApplicationSubmissionContext().getApplicationId();
-            rmState.appState.put(appId, appState);
-          } else if (childNodeName
-            .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
-            // attempt
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Loading application attempt from node: "
-                  + childNodeName);
-            }
-            ApplicationAttemptStateDataPBImpl attemptState =
-                new ApplicationAttemptStateDataPBImpl(
-                  ApplicationAttemptStateDataProto.parseFrom(childData));
-            attempts.add(attemptState);
-          } else {
-            LOG.info("Unknown child node with name: " + childNodeName);
-          }
-        }
-      }
+      List<ApplicationAttemptStateData> attempts = new ArrayList<>();
+      final RMAppStateFileProcessor rmAppStateFileProcessor =
+          new RMAppStateFileProcessor(rmState, attempts);
+      final Path rootDirectory = this.rmAppRoot;
+
+      processDirectoriesOfFiles(rmAppStateFileProcessor, rootDirectory);
 
       // go through all attempts and add them to their apps, Ideally, each
       // attempt node must have a corresponding app node, because remove
@@ -309,6 +295,29 @@ public class FileSystemRMStateStore extends RMStateStore {
     }
   }
 
+  private void processDirectoriesOfFiles(
+      RMStateFileProcessor rmAppStateFileProcessor, Path rootDirectory)
+    throws Exception {
+    for (FileStatus dir : listStatusWithRetries(rootDirectory)) {
+      checkAndResumeUpdateOperation(dir.getPath());
+      String dirName = dir.getPath().getName();
+      for (FileStatus fileNodeStatus : listStatusWithRetries(dir.getPath())) {
+        assert fileNodeStatus.isFile();
+        String fileName = fileNodeStatus.getPath().getName();
+        if (checkAndRemovePartialRecordWithRetries(fileNodeStatus.getPath())) {
+          continue;
+        }
+        byte[] fileData = readFileWithRetries(fileNodeStatus.getPath(),
+                fileNodeStatus.getLen());
+        // Set attribute if not already set
+        setUnreadableBySuperuserXattrib(fileNodeStatus.getPath());
+
+        rmAppStateFileProcessor.processChildNode(dirName, fileName,
+            fileData);
+      }
+    }
+  }
+
   private boolean checkAndRemovePartialRecord(Path record) throws IOException {
     // If the file ends with .tmp then it shows that it failed
     // during saving state into state store. The file will be deleted as a
@@ -843,6 +852,41 @@ public class FileSystemRMStateStore extends RMStateStore {
     }
   }
 
+  @Override
+  protected void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    Path planCreatePath = getNodePath(reservationRoot, planName);
+    mkdirsWithRetries(planCreatePath);
+    Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+    LOG.info("Storing state for reservation " + reservationIdName + " from " +
+        "plan " + planName + " at path " + reservationPath);
+    byte[] reservationData = reservationAllocation.toByteArray();
+    writeFileWithRetries(reservationPath, reservationData, true);
+  }
+
+  @Override
+  protected void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    Path planCreatePath = getNodePath(reservationRoot, planName);
+    Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+    LOG.info("Updating state for reservation " + reservationIdName + " from " +
+        "plan " + planName + " at path " + reservationPath);
+    byte[] reservationData = reservationAllocation.toByteArray();
+    updateFile(reservationPath, reservationData, true);
+  }
+
+  @Override
+  protected void removeReservationState(
+      String planName, String reservationIdName) throws Exception {
+    Path planCreatePath = getNodePath(reservationRoot, planName);
+    Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+    LOG.info("Removing state for reservation " + reservationIdName + " from " +
+        "plan " + planName + " at path " + reservationPath);
+    deleteFileWithRetries(reservationPath);
+  }
+
   @VisibleForTesting
   public int getNumRetries() {
     return fsNumRetries;
@@ -853,8 +897,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     return fsRetryInterval;
   }
 
-  private void setUnreadableBySuperuserXattrib(Path p)
-          throws IOException {
+  private void setUnreadableBySuperuserXattrib(Path p) throws IOException {
     if (fs.getScheme().toLowerCase().contains("hdfs")
         && intermediateEncryptionEnabled
         && !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
@@ -862,4 +905,76 @@ public class FileSystemRMStateStore extends RMStateStore {
         EnumSet.of(XAttrSetFlag.CREATE));
     }
   }
+
+  private static class ReservationStateFileProcessor implements
+      RMStateFileProcessor {
+    private RMState rmState;
+    public ReservationStateFileProcessor(RMState state) {
+      this.rmState = state;
+    }
+
+    @Override
+    public void processChildNode(String planName, String childNodeName,
+        byte[] childData) throws IOException {
+      ReservationAllocationStateProto allocationState =
+          ReservationAllocationStateProto.parseFrom(childData);
+      if (!rmState.getReservationState().containsKey(planName)) {
+        rmState.getReservationState().put(planName,
+            new HashMap<ReservationId, ReservationAllocationStateProto>());
+      }
+      ReservationId reservationId =
+          ReservationId.parseReservationId(childNodeName);
+      rmState.getReservationState().get(planName).put(reservationId,
+          allocationState);
+    }
+  }
+
+  private static class RMAppStateFileProcessor implements RMStateFileProcessor {
+    private RMState rmState;
+    private List<ApplicationAttemptStateData> attempts;
+
+    public RMAppStateFileProcessor(RMState rmState,
+        List<ApplicationAttemptStateData> attempts) {
+      this.rmState = rmState;
+      this.attempts = attempts;
+    }
+
+    @Override
+    public void processChildNode(String appDirName, String childNodeName,
+        byte[] childData)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+        // application
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading application from node: " + childNodeName);
+        }
+        ApplicationStateDataPBImpl appState =
+            new ApplicationStateDataPBImpl(
+                ApplicationStateDataProto.parseFrom(childData));
+        ApplicationId appId =
+            appState.getApplicationSubmissionContext().getApplicationId();
+        rmState.appState.put(appId, appState);
+      } else if (childNodeName.startsWith(
+          ApplicationAttemptId.appAttemptIdStrPrefix)) {
+        // attempt
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading application attempt from node: "
+              + childNodeName);
+        }
+        ApplicationAttemptStateDataPBImpl attemptState =
+            new ApplicationAttemptStateDataPBImpl(
+                ApplicationAttemptStateDataProto.parseFrom(childData));
+        attempts.add(attemptState);
+      } else {
+        LOG.info("Unknown child node with name: " + childNodeName);
+      }
+    }
+  }
+
+  // Interface for common state processing of directory of file layout
+  private interface RMStateFileProcessor {
+    void processChildNode(String appDirName, String childNodeName,
+        byte[] childData)
+        throws IOException;
+  }
 }

+ 111 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java

@@ -27,6 +27,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -39,6 +40,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -69,6 +72,9 @@ import org.iq80.leveldb.WriteBatch;
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Changes from 1.0 to 1.1, Addition of ReservationSystem state.
+ */
 public class LeveldbRMStateStore extends RMStateStore {
 
   public static final Log LOG =
@@ -84,9 +90,11 @@ public class LeveldbRMStateStore extends RMStateStore {
       RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber";
   private static final String RM_APP_KEY_PREFIX =
       RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix;
+  private static final String RM_RESERVATION_KEY_PREFIX =
+      RESERVATION_SYSTEM_ROOT + SEPARATOR;
 
   private static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(1, 0);
+      .newInstance(1, 1);
 
   private DB db;
 
@@ -112,6 +120,12 @@ public class LeveldbRMStateStore extends RMStateStore {
     return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber();
   }
 
+  private String getReservationNodeKey(String planName,
+      String reservationId) {
+    return RESERVATION_SYSTEM_ROOT + SEPARATOR + planName + SEPARATOR
+        + reservationId;
+  }
+
   @Override
   protected void initInternal(Configuration conf) throws Exception {
   }
@@ -230,9 +244,51 @@ public class LeveldbRMStateStore extends RMStateStore {
      loadRMDTSecretManagerState(rmState);
      loadRMApps(rmState);
      loadAMRMTokenSecretManagerState(rmState);
+    loadReservationState(rmState);
     return rmState;
    }
 
+  private void loadReservationState(RMState rmState) throws IOException {
+    int numReservations = 0;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(RM_RESERVATION_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+
+        String planReservationString =
+            key.substring(RM_RESERVATION_KEY_PREFIX.length());
+        String[] parts = planReservationString.split(SEPARATOR);
+        if (parts.length != 2) {
+          LOG.warn("Incorrect reservation state key " + key);
+          continue;
+        }
+        String planName = parts[0];
+        String reservationName = parts[1];
+        ReservationAllocationStateProto allocationState =
+            ReservationAllocationStateProto.parseFrom(entry.getValue());
+        if (!rmState.getReservationState().containsKey(planName)) {
+          rmState.getReservationState().put(planName,
+              new HashMap<ReservationId, ReservationAllocationStateProto>());
+        }
+        ReservationId reservationId =
+            ReservationId.parseReservationId(reservationName);
+        rmState.getReservationState().get(planName).put(reservationId,
+            allocationState);
+        numReservations++;
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    LOG.info("Recovered " + numReservations + " reservations");
+  }
+
   private void loadRMDTSecretManagerState(RMState state) throws IOException {
     int numKeys = loadRMDTSecretManagerKeys(state);
     LOG.info("Recovered " + numKeys + " RM delegation token master keys");
@@ -544,7 +600,59 @@ public class LeveldbRMStateStore extends RMStateStore {
       throw new IOException(e);
     }
   }
-  
+
+  @Override
+  protected void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        String key = getReservationNodeKey(planName, reservationIdName);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Storing state for reservation " + reservationIdName
+              + " plan " + planName + " at " + key);
+        }
+        batch.put(bytes(key), reservationAllocation.toByteArray());
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    storeReservationState(reservationAllocation, planName,
+        reservationIdName);
+  }
+
+  @Override
+  protected void removeReservationState(String planName,
+      String reservationIdName) throws Exception {
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        String reservationKey =
+            getReservationNodeKey(planName, reservationIdName);
+        batch.delete(bytes(reservationKey));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removing state for reservation " + reservationIdName
+              + " plan " + planName + " at " + reservationKey);
+        }
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
   private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId,
       Long renewDate, boolean isUpdate) throws IOException {
     String tokenKey = getRMDTTokenNodeKey(tokenId);
@@ -679,7 +787,7 @@ public class LeveldbRMStateStore extends RMStateStore {
       iter = new LeveldbIterator(db);
       iter.seekToFirst();
       while (iter.hasNext()) {
-        Entry<byte[],byte[]> entry = iter.next();
+        Entry<byte[], byte[]> entry = iter.next();
         LOG.info("entry: " + asString(entry.getKey()));
         ++numEntries;
       }

+ 57 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -28,7 +29,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
@@ -223,6 +226,60 @@ public class MemoryRMStateStore extends RMStateStore {
     rmDTMasterKeyState.remove(delegationKey);
   }
 
+  @Override
+  protected synchronized void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    LOG.info("Storing reservationallocation for " + reservationIdName + " " +
+            "for plan " + planName);
+    Map<ReservationId, ReservationAllocationStateProto> planState =
+        state.getReservationState().get(planName);
+    if (planState == null) {
+      planState = new HashMap<>();
+      state.getReservationState().put(planName, planState);
+    }
+    ReservationId reservationId =
+        ReservationId.parseReservationId(reservationIdName);
+    planState.put(reservationId, reservationAllocation);
+  }
+
+  @Override
+  protected synchronized void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    LOG.info("Updating reservationallocation for " + reservationIdName + " " +
+            "for plan " + planName);
+    Map<ReservationId, ReservationAllocationStateProto> planState =
+        state.getReservationState().get(planName);
+    if (planState == null) {
+      throw new YarnRuntimeException("State for plan " + planName + " does " +
+          "not exist");
+    }
+    ReservationId reservationId =
+        ReservationId.parseReservationId(reservationIdName);
+    planState.put(reservationId, reservationAllocation);
+  }
+
+  @Override
+  protected synchronized void removeReservationState(
+      String planName, String reservationIdName) throws Exception {
+    LOG.info("Removing reservationallocation " + reservationIdName
+              + " for plan " + planName);
+
+    Map<ReservationId, ReservationAllocationStateProto> planState =
+        state.getReservationState().get(planName);
+    if (planState == null) {
+      throw new YarnRuntimeException("State for plan " + planName + " does " +
+          "not exist");
+    }
+    ReservationId reservationId =
+        ReservationId.parseReservationId(reservationIdName);
+    planState.remove(reservationId);
+    if (planState.isEmpty()) {
+      state.getReservationState().remove(planName);
+    }
+  }
+
   @Override
   protected Version loadVersion() throws Exception {
     return null;

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
@@ -101,6 +102,26 @@ public class NullRMStateStore extends RMStateStore {
     // Do nothing
   }
 
+  @Override
+  protected void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected void removeReservationState(String planName,
+      String reservationIdName) throws Exception {
+      // Do nothing
+  }
+
+  @Override
+  protected void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception {
+      // Do nothing
+  }
+
   @Override
   public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
     // Do nothing
@@ -155,4 +176,6 @@ public class NullRMStateStore extends RMStateStore {
     // Do nothing
   }
 
+
+
 }

+ 151 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -43,11 +43,13 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
@@ -87,6 +89,8 @@ public abstract class RMStateStore extends AbstractService {
       "RMDTSequenceNumber_";
   protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
       "AMRMTokenSecretManagerRoot";
+  protected static final String RESERVATION_SYSTEM_ROOT =
+      "ReservationSystemRoot";
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String EPOCH_NODE = "EpochNode";
   private ResourceManager resourceManager;
@@ -136,7 +140,16 @@ public abstract class RMStateStore extends AbstractService {
               new UpdateRMDTTransition())
        .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
            RMStateStoreEventType.UPDATE_AMRM_TOKEN,
-              new StoreOrUpdateAMRMTokenTransition())
+           new StoreOrUpdateAMRMTokenTransition())
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          RMStateStoreEventType.STORE_RESERVATION,
+          new StoreReservationAllocationTransition())
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          RMStateStoreEventType.UPDATE_RESERVATION,
+          new UpdateReservationAllocationTransition())
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          RMStateStoreEventType.REMOVE_RESERVATION,
+          new RemoveReservationAllocationTransition())
       .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
           RMStateStoreEventType.FENCED)
       .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
@@ -152,7 +165,10 @@ public abstract class RMStateStore extends AbstractService {
           RMStateStoreEventType.STORE_DELEGATION_TOKEN,
           RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
           RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
-          RMStateStoreEventType.UPDATE_AMRM_TOKEN));
+          RMStateStoreEventType.UPDATE_AMRM_TOKEN,
+          RMStateStoreEventType.STORE_RESERVATION,
+          RMStateStoreEventType.UPDATE_RESERVATION,
+          RMStateStoreEventType.REMOVE_RESERVATION));
 
   private final StateMachine<RMStateStoreState,
                              RMStateStoreEventType,
@@ -415,6 +431,80 @@ public abstract class RMStateStore extends AbstractService {
     }
   }
 
+  private static class StoreReservationAllocationTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      RMStateStoreStoreReservationEvent reservationEvent =
+          (RMStateStoreStoreReservationEvent) event;
+      try {
+        LOG.info("Storing reservation allocation." + reservationEvent
+                .getReservationIdName());
+        store.storeReservationState(
+            reservationEvent.getReservationAllocation(),
+            reservationEvent.getPlanName(),
+            reservationEvent.getReservationIdName());
+      } catch (Exception e) {
+        LOG.error("Error while storing reservation allocation.", e);
+        store.notifyStoreOperationFailed(e);
+      }
+    }
+  }
+
+  private static class UpdateReservationAllocationTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      RMStateStoreStoreReservationEvent reservationEvent =
+          (RMStateStoreStoreReservationEvent) event;
+      try {
+        LOG.info("Updating reservation allocation." + reservationEvent
+                .getReservationIdName());
+        store.updateReservationState(
+            reservationEvent.getReservationAllocation(),
+            reservationEvent.getPlanName(),
+            reservationEvent.getReservationIdName());
+      } catch (Exception e) {
+        LOG.error("Error while updating reservation allocation.", e);
+        store.notifyStoreOperationFailed(e);
+      }
+    }
+  }
+
+  private static class RemoveReservationAllocationTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      RMStateStoreStoreReservationEvent reservationEvent =
+          (RMStateStoreStoreReservationEvent) event;
+      try {
+        LOG.info("Removing reservation allocation." + reservationEvent
+                .getReservationIdName());
+        store.removeReservationState(
+            reservationEvent.getPlanName(),
+            reservationEvent.getReservationIdName());
+      } catch (Exception e) {
+        LOG.error("Error while removing reservation allocation.", e);
+        store.notifyStoreOperationFailed(e);
+      }
+    }
+  }
+
   public RMStateStore() {
     super(RMStateStore.class.getName());
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -457,6 +547,9 @@ public abstract class RMStateStore extends AbstractService {
 
     AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
 
+    private Map<String, Map<ReservationId, ReservationAllocationStateProto>>
+        reservationState = new TreeMap<>();
+
     public Map<ApplicationId, ApplicationStateData> getApplicationState() {
       return appState;
     }
@@ -468,6 +561,11 @@ public abstract class RMStateStore extends AbstractService {
     public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
       return amrmTokenSecretManagerState;
     }
+
+    public Map<String, Map<ReservationId, ReservationAllocationStateProto>>
+        getReservationState() {
+      return reservationState;
+    }
   }
     
   private Dispatcher rmDispatcher;
@@ -745,6 +843,57 @@ public abstract class RMStateStore extends AbstractService {
         RMStateStoreEventType.REMOVE_MASTERKEY));
   }
 
+  /**
+   * Blocking Apis to maintain reservation state.
+   */
+  public void storeNewReservation(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) {
+    handleStoreEvent(new RMStateStoreStoreReservationEvent(
+        reservationAllocation, RMStateStoreEventType.STORE_RESERVATION,
+        planName, reservationIdName));
+  }
+
+  public void updateReservation(
+      ReservationAllocationStateProto reservationAllocation,
+      String planName, String reservationIdName) {
+    handleStoreEvent(new RMStateStoreStoreReservationEvent(
+        reservationAllocation, RMStateStoreEventType.UPDATE_RESERVATION,
+        planName, reservationIdName));
+  }
+
+  public void removeReservation(String planName, String reservationIdName) {
+    handleStoreEvent(new RMStateStoreStoreReservationEvent(
+            null, RMStateStoreEventType.REMOVE_RESERVATION,
+            planName, reservationIdName));
+  }
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to store the state of
+   * a reservation allocation.
+   */
+  protected abstract void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception;
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to remove the state of
+   * a reservation allocation.
+   */
+  protected abstract void removeReservationState(String planName,
+      String reservationIdName) throws Exception;
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to update the state of
+   * a reservation allocation.
+   */
+  protected abstract void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName) throws Exception;
+
   /**
    * Blocking API
    * Derived classes must implement this method to remove the state of

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java

@@ -32,5 +32,8 @@ public enum RMStateStoreEventType {
   STORE_DELEGATION_TOKEN,
   REMOVE_DELEGATION_TOKEN,
   UPDATE_DELEGATION_TOKEN,
-  UPDATE_AMRM_TOKEN
+  UPDATE_AMRM_TOKEN,
+  STORE_RESERVATION,
+  UPDATE_RESERVATION,
+  REMOVE_RESERVATION,
 }

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+
+/**
+ * Event representing maintaining ReservationSystem state.
+ */
+public class RMStateStoreStoreReservationEvent extends RMStateStoreEvent {
+
+  private ReservationAllocationStateProto reservationAllocation;
+  private String planName;
+  private String reservationIdName;
+
+  public RMStateStoreStoreReservationEvent(RMStateStoreEventType type) {
+    super(type);
+  }
+
+  public RMStateStoreStoreReservationEvent(
+      ReservationAllocationStateProto reservationAllocationState,
+      RMStateStoreEventType type, String planName, String reservationIdName) {
+    this(type);
+    this.reservationAllocation = reservationAllocationState;
+    this.planName = planName;
+    this.reservationIdName = reservationIdName;
+  }
+
+  public ReservationAllocationStateProto getReservationAllocation() {
+    return reservationAllocation;
+  }
+
+  public String getPlanName() {
+    return planName;
+  }
+
+  public String getReservationIdName() {
+    return reservationIdName;
+  }
+}

+ 123 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -26,6 +26,7 @@ import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
@@ -45,6 +46,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -107,9 +110,18 @@ import com.google.common.annotations.VisibleForTesting;
  *        |----- currentMasterKey
  *        |----- nextMasterKey
  *
+ * |-- RESERVATION_SYSTEM_ROOT
+ *        |------PLAN_1
+ *        |      |------ RESERVATION_1
+ *        |      |------ RESERVATION_2
+ *        |      ....
+ *        |------PLAN_2
+ *        ....
  * Note: Changes from 1.1 to 1.2 - AMRMTokenSecretManager state has been saved
  * separately. The currentMasterkey and nextMasterkey have been stored.
  * Also, AMRMToken has been removed from ApplicationAttemptState.
+ *
+ * Changes from 1.2 to 1.3, Addition of ReservationSystem state.
  */
 @Private
 @Unstable
@@ -120,7 +132,7 @@ public class ZKRMStateStore extends RMStateStore {
 
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
   protected static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(1, 2);
+      .newInstance(1, 3);
   private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
       "RMDelegationTokensRoot";
   private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -142,6 +154,7 @@ public class ZKRMStateStore extends RMStateStore {
   private String delegationTokensRootPath;
   private String dtSequenceNumberPath;
   private String amrmTokenSecretManagerRoot;
+  private String reservationRoot;
   @VisibleForTesting
   protected String znodeWorkingPath;
 
@@ -258,6 +271,7 @@ public class ZKRMStateStore extends RMStateStore {
         RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
     amrmTokenSecretManagerRoot =
         getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+    reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
   }
 
   @Override
@@ -279,6 +293,7 @@ public class ZKRMStateStore extends RMStateStore {
     create(delegationTokensRootPath);
     create(dtSequenceNumberPath);
     create(amrmTokenSecretManagerRoot);
+    create(reservationRoot);
   }
 
   private void logRootNodeAcls(String prefix) throws Exception {
@@ -375,9 +390,41 @@ public class ZKRMStateStore extends RMStateStore {
     loadRMAppState(rmState);
     // recover AMRMTokenSecretManager
     loadAMRMTokenSecretManagerState(rmState);
+    // recover reservation state
+    loadReservationSystemState(rmState);
     return rmState;
   }
 
+  private void loadReservationSystemState(RMState rmState) throws Exception {
+    List<String> planNodes = getChildren(reservationRoot);
+    for (String planName : planNodes) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Loading plan from znode: " + planName);
+      }
+      String planNodePath = getNodePath(reservationRoot, planName);
+
+      List<String> reservationNodes = getChildren(planNodePath);
+      for (String reservationNodeName : reservationNodes) {
+        String reservationNodePath = getNodePath(planNodePath,
+            reservationNodeName);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading reservation from znode: " + reservationNodePath);
+        }
+        byte[] reservationData = getData(reservationNodePath);
+        ReservationAllocationStateProto allocationState =
+            ReservationAllocationStateProto.parseFrom(reservationData);
+        if (!rmState.getReservationState().containsKey(planName)) {
+          rmState.getReservationState().put(planName,
+              new HashMap<ReservationId, ReservationAllocationStateProto>());
+        }
+        ReservationId reservationId =
+            ReservationId.parseReservationId(reservationNodeName);
+        rmState.getReservationState().get(planName).put(reservationId,
+            allocationState);
+      }
+    }
+  }
+
   private void loadAMRMTokenSecretManagerState(RMState rmState)
       throws Exception {
     byte[] data = getData(amrmTokenSecretManagerRoot);
@@ -763,6 +810,81 @@ public class ZKRMStateStore extends RMStateStore {
     safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
   }
 
+  @Override
+  protected synchronized void removeReservationState(String planName,
+      String reservationIdName)
+      throws Exception {
+    String planNodePath =
+        getNodePath(reservationRoot, planName);
+    String reservationPath = getNodePath(planNodePath,
+        reservationIdName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing reservationallocation " + reservationIdName + " for" +
+          " plan " + planName);
+    }
+    safeDelete(reservationPath);
+
+    List<String> reservationNodes = getChildren(planNodePath);
+    if (reservationNodes.isEmpty()) {
+      safeDelete(planNodePath);
+    }
+  }
+
+  @Override
+  protected synchronized void storeReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName)
+      throws Exception {
+    SafeTransaction trx = new SafeTransaction();
+    addOrUpdateReservationState(
+        reservationAllocation, planName, reservationIdName, trx, false);
+    trx.commit();
+  }
+
+  @Override
+  protected synchronized void updateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName)
+      throws Exception {
+    SafeTransaction trx = new SafeTransaction();
+    addOrUpdateReservationState(
+        reservationAllocation, planName, reservationIdName, trx, true);
+    trx.commit();
+  }
+
+  private void addOrUpdateReservationState(
+      ReservationAllocationStateProto reservationAllocation, String planName,
+      String reservationIdName, SafeTransaction trx, boolean isUpdate)
+      throws Exception {
+    String planCreatePath =
+        getNodePath(reservationRoot, planName);
+    String reservationPath = getNodePath(planCreatePath,
+        reservationIdName);
+    byte[] reservationData = reservationAllocation.toByteArray();
+
+    if (!exists(planCreatePath)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath);
+      }
+      trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT);
+    }
+
+    if (isUpdate) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updating reservation: " + reservationIdName + " in plan:"
+            + planName + " at: " + reservationPath);
+      }
+      trx.setData(reservationPath, reservationData, -1);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing reservation: " + reservationIdName + " in plan:"
+            + planName + " at: " + reservationPath);
+      }
+      trx.create(reservationPath, reservationData, zkAcl,
+          CreateMode.PERSISTENT);
+    }
+  }
+
   /**
    * Utility function to ensure that the configured base znode exists.
    * This recursively creates the znode as well as all of its parents.

+ 101 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java

@@ -18,11 +18,23 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnProtos.ReservationDefinitionProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ResourceAllocationRequestProto;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -52,4 +64,92 @@ public final class ReservationSystemUtil {
     }
     return resources;
   }
+
+  public static ReservationAllocationStateProto buildStateProto(
+      ReservationAllocation allocation) {
+    ReservationAllocationStateProto.Builder builder =
+        ReservationAllocationStateProto.newBuilder();
+
+    builder.setAcceptanceTimestamp(allocation.getAcceptanceTime());
+    builder.setContainsGangs(allocation.containsGangs());
+    builder.setStartTime(allocation.getStartTime());
+    builder.setEndTime(allocation.getEndTime());
+    builder.setUser(allocation.getUser());
+    ReservationDefinitionProto definitionProto = convertToProtoFormat(
+        allocation.getReservationDefinition());
+    builder.setReservationDefinition(definitionProto);
+
+    for (Map.Entry<ReservationInterval, Resource> entry :
+        allocation.getAllocationRequests().entrySet()) {
+      ResourceAllocationRequestProto p =
+          ResourceAllocationRequestProto.newBuilder()
+          .setStartTime(entry.getKey().getStartTime())
+          .setEndTime(entry.getKey().getEndTime())
+          .setResource(convertToProtoFormat(entry.getValue()))
+          .build();
+      builder.addAllocationRequests(p);
+    }
+
+    ReservationAllocationStateProto allocationProto = builder.build();
+    return allocationProto;
+  }
+
+  private static ReservationDefinitionProto convertToProtoFormat(
+      ReservationDefinition reservationDefinition) {
+    return ((ReservationDefinitionPBImpl)reservationDefinition).getProto();
+  }
+
+  public static ResourceProto convertToProtoFormat(Resource e) {
+    return YarnProtos.ResourceProto.newBuilder()
+        .setMemory(e.getMemory())
+        .setVirtualCores(e.getVirtualCores())
+        .build();
+  }
+
+  public static Map<ReservationInterval, Resource> toAllocations(
+      List<ResourceAllocationRequestProto> allocationRequestsList) {
+    Map<ReservationInterval, Resource> allocations = new HashMap<>();
+    for (ResourceAllocationRequestProto proto : allocationRequestsList) {
+      allocations.put(
+          new ReservationInterval(proto.getStartTime(), proto.getEndTime()),
+          convertFromProtoFormat(proto.getResource()));
+    }
+    return allocations;
+  }
+
+  private static ResourcePBImpl convertFromProtoFormat(ResourceProto resource) {
+    return new ResourcePBImpl(resource);
+  }
+
+  public static ReservationDefinitionPBImpl convertFromProtoFormat(
+      ReservationDefinitionProto r) {
+    return new ReservationDefinitionPBImpl(r);
+  }
+
+  public static ReservationIdPBImpl convertFromProtoFormat(
+      ReservationIdProto r) {
+    return new ReservationIdPBImpl(r);
+  }
+
+  public static ReservationId toReservationId(
+      ReservationIdProto reservationId) {
+    return new ReservationIdPBImpl(reservationId);
+  }
+
+  public static InMemoryReservationAllocation toInMemoryAllocation(
+      String planName, ReservationId reservationId,
+      ReservationAllocationStateProto allocationState, Resource minAlloc,
+      ResourceCalculator planResourceCalculator) {
+    ReservationDefinition definition =
+        convertFromProtoFormat(
+            allocationState.getReservationDefinition());
+    Map<ReservationInterval, Resource> allocations = toAllocations(
+            allocationState.getAllocationRequestsList());
+    InMemoryReservationAllocation allocation =
+        new InMemoryReservationAllocation(reservationId, definition,
+        allocationState.getUser(), planName, allocationState.getStartTime(),
+        allocationState.getEndTime(), allocations, planResourceCalculator,
+        minAlloc, allocationState.getContainsGangs());
+    return allocation;
+  }
 }

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto

@@ -96,4 +96,20 @@ message AMRMTokenSecretManagerStateProto {
 message RMDelegationTokenIdentifierDataProto {
   optional YARNDelegationTokenIdentifierProto token_identifier = 1;
   optional int64 renewDate = 2;
-}
+}
+
+message ResourceAllocationRequestProto {
+  optional int64 start_time = 1;
+  optional int64 end_time = 2;
+  optional ResourceProto resource = 3;
+}
+
+message ReservationAllocationStateProto {
+  optional ReservationDefinitionProto reservation_definition = 1;
+  repeated ResourceAllocationRequestProto allocation_requests = 2;
+  optional int64 start_time = 3;
+  optional int64 end_time = 4;
+  optional string user = 5;
+  optional bool contains_gangs = 6;
+  optional int64 acceptance_timestamp = 7;
+}

+ 186 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -36,6 +39,11 @@ import java.util.Map;
 
 import javax.crypto.SecretKey;
 
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -63,6 +71,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
@@ -691,4 +701,180 @@ public class RMStateStoreTestBase {
 
     store.close();
   }
+
+  public void testReservationStateStore(
+      RMStateStoreHelper stateStoreHelper) throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getStateStore()).thenReturn(store);
+
+    long ts = System.currentTimeMillis();
+    ReservationId r1 = ReservationId.newInstance(ts, 1);
+    int start = 1;
+    int[] alloc = { 10, 10, 10, 10, 10 };
+    ResourceCalculator res = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1024, 1);
+    boolean hasGang = true;
+    String planName = "dedicated";
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1, alloc.length);
+    ReservationAllocation allocation = new InMemoryReservationAllocation(
+        r1, rDef, "u3", planName, 0, 0 + alloc.length,
+        ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res,
+        minAlloc, hasGang);
+    ReservationAllocationStateProto allocationStateProto =
+        ReservationSystemUtil.buildStateProto(allocation);
+    assertAllocationStateEqual(allocation, allocationStateProto);
+
+    // 1. Load empty store and verify no errors
+    store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
+    store.setRMDispatcher(dispatcher);
+    RMState state = store.loadState();
+    Map<String, Map<ReservationId, ReservationAllocationStateProto>>
+      reservationState = state.getReservationState();
+    Assert.assertNotNull(reservationState);
+
+    // 2. Store single reservation and verify
+    String reservationIdName = r1.toString();
+    rmContext.getStateStore().storeNewReservation(
+        allocationStateProto,
+        planName, reservationIdName);
+
+
+    // load state and verify new state
+    validateStoredReservation(
+        stateStoreHelper, dispatcher, rmContext, r1, planName, allocation,
+        allocationStateProto);
+
+    // 3. update state test
+    alloc = new int[]{6, 6, 6};
+    hasGang = false;
+    allocation = new InMemoryReservationAllocation(
+        r1, rDef, "u3", planName, 2, 2 + alloc.length,
+        ReservationSystemTestUtil.generateAllocation(1L, 2L, alloc), res,
+        minAlloc, hasGang);
+    allocationStateProto =
+        ReservationSystemUtil.buildStateProto(allocation);
+    rmContext.getStateStore().updateReservation(
+        allocationStateProto,
+        planName, reservationIdName);
+
+    // load state and verify updated reservation
+    validateStoredReservation(
+        stateStoreHelper, dispatcher, rmContext, r1, planName, allocation,
+        allocationStateProto);
+
+    // 4. add a second one and remove the first one
+    ReservationId r2 = ReservationId.newInstance(ts, 2);
+    ReservationAllocation allocation2 = new InMemoryReservationAllocation(
+        r2, rDef, "u3", planName, 0, 0 + alloc.length,
+        ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res,
+        minAlloc, hasGang);
+    ReservationAllocationStateProto allocationStateProto2 =
+        ReservationSystemUtil.buildStateProto(allocation2);
+    String reservationIdName2 = r2.toString();
+
+    rmContext.getStateStore().storeNewReservation(
+        allocationStateProto2,
+        planName, reservationIdName2);
+    rmContext.getStateStore().removeReservation(planName, reservationIdName);
+
+    // load state and verify r1 is removed and r2 is still there
+    Map<ReservationId, ReservationAllocationStateProto> reservations;
+
+    store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
+    store.setRMDispatcher(dispatcher);
+    state = store.loadState();
+    reservationState = state.getReservationState();
+    Assert.assertNotNull(reservationState);
+    reservations = reservationState.get(planName);
+    Assert.assertNotNull(reservations);
+    ReservationAllocationStateProto storedReservationAllocation =
+        reservations.get(r1);
+    Assert.assertNull("Removed reservation should not be available in store",
+        storedReservationAllocation);
+
+    storedReservationAllocation = reservations.get(r2);
+    assertAllocationStateEqual(
+        allocationStateProto2, storedReservationAllocation);
+    assertAllocationStateEqual(allocation2, storedReservationAllocation);
+
+
+    // 5. remove last reservation removes the plan state
+    rmContext.getStateStore().removeReservation(planName, reservationIdName2);
+
+    store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
+    store.setRMDispatcher(dispatcher);
+    state = store.loadState();
+    reservationState = state.getReservationState();
+    Assert.assertNotNull(reservationState);
+    reservations = reservationState.get(planName);
+    Assert.assertNull(reservations);
+  }
+
+  private void validateStoredReservation(
+      RMStateStoreHelper stateStoreHelper, TestDispatcher dispatcher,
+      RMContext rmContext, ReservationId r1, String planName,
+      ReservationAllocation allocation,
+      ReservationAllocationStateProto allocationStateProto) throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
+    store.setRMDispatcher(dispatcher);
+    RMState state = store.loadState();
+    Map<String, Map<ReservationId, ReservationAllocationStateProto>>
+        reservationState = state.getReservationState();
+    Assert.assertNotNull(reservationState);
+    Map<ReservationId, ReservationAllocationStateProto> reservations =
+        reservationState.get(planName);
+    Assert.assertNotNull(reservations);
+    ReservationAllocationStateProto storedReservationAllocation =
+        reservations.get(r1);
+    Assert.assertNotNull(storedReservationAllocation);
+
+    assertAllocationStateEqual(
+        allocationStateProto, storedReservationAllocation);
+    assertAllocationStateEqual(allocation, storedReservationAllocation);
+  }
+
+  void assertAllocationStateEqual(
+      ReservationAllocationStateProto expected,
+      ReservationAllocationStateProto actual) {
+
+    Assert.assertEquals(
+        expected.getAcceptanceTimestamp(), actual.getAcceptanceTimestamp());
+    Assert.assertEquals(expected.getStartTime(), actual.getStartTime());
+    Assert.assertEquals(expected.getEndTime(), actual.getEndTime());
+    Assert.assertEquals(expected.getContainsGangs(), actual.getContainsGangs());
+    Assert.assertEquals(expected.getUser(), actual.getUser());
+    assertEquals(
+        expected.getReservationDefinition(), actual.getReservationDefinition());
+    assertEquals(expected.getAllocationRequestsList(),
+        actual.getAllocationRequestsList());
+  }
+
+  void assertAllocationStateEqual(
+      ReservationAllocation expected,
+      ReservationAllocationStateProto actual) {
+    Assert.assertEquals(
+        expected.getAcceptanceTime(), actual.getAcceptanceTimestamp());
+    Assert.assertEquals(expected.getStartTime(), actual.getStartTime());
+    Assert.assertEquals(expected.getEndTime(), actual.getEndTime());
+    Assert.assertEquals(expected.containsGangs(), actual.getContainsGangs());
+    Assert.assertEquals(expected.getUser(), actual.getUser());
+    assertEquals(
+        expected.getReservationDefinition(),
+        ReservationSystemUtil.convertFromProtoFormat(
+            actual.getReservationDefinition()));
+    assertEquals(
+        expected.getAllocationRequests(),
+        ReservationSystemUtil.toAllocations(
+            actual.getAllocationRequestsList()));
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java

@@ -186,6 +186,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       testDeleteStore(fsTester);
       testRemoveApplication(fsTester);
       testAMRMTokenSecretManagerStateStore(fsTester);
+      testReservationStateStore(fsTester);
     } finally {
       cluster.shutdown();
     }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java

@@ -102,6 +102,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
     testAMRMTokenSecretManagerStateStore(tester);
   }
 
+  @Test(timeout = 60000)
+  public void testReservation() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testReservationStateStore(tester);
+  }
+
   class LeveldbStateStoreTester implements RMStateStoreHelper {
 
     @Override

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -174,6 +174,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     testDeleteStore(zkTester);
     testRemoveApplication(zkTester);
     testAMRMTokenSecretManagerStateStore(zkTester);
+    testReservationStateStore(zkTester);
     ((TestZKRMStateStoreTester.TestZKRMStateStoreInternal)
         zkTester.getRMStateStore()).testRetryingCreateRootDir();
   }

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java

@@ -185,6 +185,22 @@ public class ReservationSystemTestUtil {
     return scheduler;
   }
 
+  public static ReservationDefinition createSimpleReservationDefinition(
+      long arrival, long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
+            duration);
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    rDef.setReservationRequests(reqs);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    return rDef;
+  }
+
   @SuppressWarnings("unchecked")
   public CapacityScheduler mockCapacityScheduler(int numContainers)
       throws IOException {

+ 10 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java

@@ -17,7 +17,6 @@
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -25,11 +24,7 @@ import java.util.Random;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.After;
@@ -67,7 +62,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, false, false);
@@ -89,7 +85,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, true, false);
@@ -112,7 +109,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = { 0, 5, 10, 10, 5, 0 };
     int start = 100;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, true, false);
@@ -135,7 +133,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = {};
     long start = 0;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     Map<ReservationInterval, Resource> allocations =
         new HashMap<ReservationInterval, Resource>();
@@ -154,7 +153,8 @@ public class TestInMemoryReservationAllocation {
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     ReservationDefinition rDef =
-        createSimpleReservationDefinition(start, start + alloc.length + 1,
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            start, start + alloc.length + 1,
             alloc.length);
     boolean isGang = true;
     Map<ReservationInterval, Resource> allocations =
@@ -184,22 +184,6 @@ public class TestInMemoryReservationAllocation {
     Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime());
   }
 
-  private ReservationDefinition createSimpleReservationDefinition(long arrival,
-      long deadline, long duration) {
-    // create a request with a single atomic ask
-    ReservationRequest r =
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
-            duration);
-    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setReservationResources(Collections.singletonList(r));
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
-    rDef.setReservationRequests(reqs);
-    rDef.setArrival(arrival);
-    rDef.setDeadline(deadline);
-    return rDef;
-  }
-
   private Map<ReservationInterval, Resource> generateAllocation(
       int startTime, int[] alloc, boolean isStep, boolean isGang) {
     Map<ReservationInterval, Resource> req =