Quellcode durchsuchen

YARN-1341. Recover NMTokens upon nodemanager restart. (Contributed by Jason Lowe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611512 13f79535-47bb-0310-9956-ffa450edef68
Junping Du vor 11 Jahren
Ursprung
Commit
403ec8ea80
10 geänderte Dateien mit 561 neuen und 19 gelöschten Zeilen
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
  3. 12 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  4. 99 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
  5. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
  6. 35 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
  7. 99 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
  8. 49 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
  9. 81 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
  10. 154 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/security/TestNMTokenSecretManagerInNM.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -46,6 +46,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2228. Augmented TimelineServer to load pseudo authentication filter when
     authentication = simple. (Zhijie Shen via vinodkv)
 
+    YARN-1341. Recover NMTokens upon nodemanager restart. (Jason Lowe via 
+    junping_du)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java

@@ -42,7 +42,7 @@ public class BaseNMTokenSecretManager extends
   private static Log LOG = LogFactory
       .getLog(BaseNMTokenSecretManager.class);
 
-  private int serialNo = new SecureRandom().nextInt();
+  protected int serialNo = new SecureRandom().nextInt();
 
   protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
   protected final Lock readLock = readWriteLock.readLock();

+ 12 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -169,6 +169,15 @@ public class NodeManager extends CompositeService
     }
   }
 
+  private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager,
+      NMContainerTokenSecretManager containerTokenSecretManager)
+          throws IOException {
+    if (nmStore.canRecover()) {
+      nmTokenSecretManager.recover(nmStore.loadNMTokenState());
+      // TODO: recover containerTokenSecretManager
+    }
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
 
@@ -184,7 +193,9 @@ public class NodeManager extends CompositeService
         new NMContainerTokenSecretManager(conf);
 
     NMTokenSecretManagerInNM nmTokenSecretManager =
-        new NMTokenSecretManagerInNM();
+        new NMTokenSecretManagerInNM(nmStore);
+
+    recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
     
     this.aclsManager = new ApplicationACLsManager(conf);
 

+ 99 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -35,11 +35,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
@@ -72,6 +76,14 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
   private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
 
+  private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
+  private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
+  private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
+  private static final String NM_TOKENS_CURRENT_MASTER_KEY =
+      NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
+  private static final String NM_TOKENS_PREV_MASTER_KEY =
+      NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
+
   private DB db;
 
   public NMLeveldbStateStoreService() {
@@ -367,6 +379,93 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   }
 
 
+  @Override
+  public RecoveredNMTokenState loadNMTokenState() throws IOException {
+    RecoveredNMTokenState state = new RecoveredNMTokenState();
+    state.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(NM_TOKENS_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String fullKey = asString(entry.getKey());
+        if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) {
+          break;
+        }
+        String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length());
+        if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
+          state.currentMasterKey = parseMasterKey(entry.getValue());
+        } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
+          state.previousMasterKey = parseMasterKey(entry.getValue());
+        } else if (key.startsWith(
+            ApplicationAttemptId.appAttemptIdStrPrefix)) {
+          ApplicationAttemptId attempt;
+          try {
+            attempt = ConverterUtils.toApplicationAttemptId(key);
+          } catch (IllegalArgumentException e) {
+            throw new IOException("Bad application master key state for "
+                + fullKey, e);
+          }
+          state.applicationMasterKeys.put(attempt,
+              parseMasterKey(entry.getValue()));
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return state;
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+    storeMasterKey(NM_TOKENS_CURRENT_MASTER_KEY, key);
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+    storeMasterKey(NM_TOKENS_PREV_MASTER_KEY, key);
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt, MasterKey key) throws IOException {
+    storeMasterKey(NM_TOKENS_KEY_PREFIX + attempt, key);
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt) throws IOException {
+    String key = NM_TOKENS_KEY_PREFIX + attempt;
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  private MasterKey parseMasterKey(byte[] keyData) throws IOException {
+    return new MasterKeyPBImpl(MasterKeyProto.parseFrom(keyData));
+  }
+
+  private void storeMasterKey(String dbKey, MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl pb = (MasterKeyPBImpl) key;
+    try {
+      db.put(bytes(dbKey), pb.getProto().toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+
   @Override
   protected void initStorage(Configuration conf)
       throws IOException {

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java

@@ -22,10 +22,12 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 // The state store to use when state isn't being stored
 public class NMNullStateStoreService extends NMStateStoreService {
@@ -77,6 +79,32 @@ public class NMNullStateStoreService extends NMStateStoreService {
   public void removeDeletionTask(int taskId) throws IOException {
   }
 
+  @Override
+  public RecoveredNMTokenState loadNMTokenState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
+      MasterKey key) throws IOException {
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
+      throws IOException {
+  }
+
   @Override
   protected void initStorage(Configuration conf) throws IOException {
   }

+ 35 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java

@@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 @Private
 @Unstable
@@ -100,6 +102,24 @@ public abstract class NMStateStoreService extends AbstractService {
     }
   }
 
+  public static class RecoveredNMTokenState {
+    MasterKey currentMasterKey;
+    MasterKey previousMasterKey;
+    Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
+
+    public MasterKey getCurrentMasterKey() {
+      return currentMasterKey;
+    }
+
+    public MasterKey getPreviousMasterKey() {
+      return previousMasterKey;
+    }
+
+    public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() {
+      return applicationMasterKeys;
+    }
+  }
+
   /** Initialize the state storage */
   @Override
   public void serviceInit(Configuration conf) throws IOException {
@@ -173,6 +193,21 @@ public abstract class NMStateStoreService extends AbstractService {
   public abstract void removeDeletionTask(int taskId) throws IOException;
 
 
+  public abstract RecoveredNMTokenState loadNMTokenState() throws IOException;
+
+  public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException;
+
+  public abstract void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException;
+
+  public abstract void storeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt, MasterKey key) throws IOException;
+
+  public abstract void removeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt) throws IOException;
+
+
   protected abstract void initStorage(Configuration conf) throws IOException;
 
   protected abstract void startStorage() throws IOException;

+ 99 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.security;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,6 +32,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 
@@ -45,16 +49,78 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
   
   private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
   private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap;
+  private final NMStateStoreService stateStore;
   private NodeId nodeId;                                                      
   
-  
   public NMTokenSecretManagerInNM() {
+    this(new NMNullStateStoreService());
+  }
+
+  public NMTokenSecretManagerInNM(NMStateStoreService stateStore) {
     this.oldMasterKeys =
         new HashMap<ApplicationAttemptId, MasterKeyData>();
     appToAppAttemptMap =         
         new HashMap<ApplicationId, List<ApplicationAttemptId>>();
+    this.stateStore = stateStore;
   }
   
+  public synchronized void recover(RecoveredNMTokenState state)
+      throws IOException {
+    MasterKey key = state.getCurrentMasterKey();
+    if (key != null) {
+      super.currentMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    key = state.getPreviousMasterKey();
+    if (key != null) {
+      previousMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    // restore the serial number from the current master key
+    if (super.currentMasterKey != null) {
+      super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
+    }
+
+    for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
+         state.getApplicationMasterKeys().entrySet()) {
+      key = entry.getValue();
+      oldMasterKeys.put(entry.getKey(),
+          new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+    }
+
+    // reconstruct app to app attempts map
+    appToAppAttemptMap.clear();
+    for (ApplicationAttemptId attempt : oldMasterKeys.keySet()) {
+      ApplicationId app = attempt.getApplicationId();
+      List<ApplicationAttemptId> attempts = appToAppAttemptMap.get(app);
+      if (attempts == null) {
+        attempts = new ArrayList<ApplicationAttemptId>();
+        appToAppAttemptMap.put(app, attempts);
+      }
+      attempts.add(attempt);
+    }
+  }
+
+  private void updateCurrentMasterKey(MasterKeyData key) {
+    super.currentMasterKey = key;
+    try {
+      stateStore.storeNMTokenCurrentMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update current master key in state store", e);
+    }
+  }
+
+  private void updatePreviousMasterKey(MasterKeyData key) {
+    previousMasterKey = key;
+    try {
+      stateStore.storeNMTokenPreviousMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update previous master key in state store", e);
+    }
+  }
+
   /**
    * Used by NodeManagers to create a token-secret-manager with the key
    * obtained from the RM. This can happen during registration or when the RM
@@ -62,20 +128,16 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
    */
   @Private
   public synchronized void setMasterKey(MasterKey masterKey) {
-    LOG.info("Rolling master-key for nm-tokens, got key with id :"
-        + masterKey.getKeyId());
-    if (super.currentMasterKey == null) {
-      super.currentMasterKey =
-          new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
-            .array()));
-    } else {
-      if (super.currentMasterKey.getMasterKey().getKeyId() != masterKey
-        .getKeyId()) {
-        this.previousMasterKey = super.currentMasterKey;
-        super.currentMasterKey =
-            new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
-              .array()));
+    // Update keys only if the key has changed.
+    if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
+          .getKeyId() != masterKey.getKeyId()) {
+      LOG.info("Rolling master-key for container-tokens, got key with id "
+          + masterKey.getKeyId());
+      if (super.currentMasterKey != null) {
+        updatePreviousMasterKey(super.currentMasterKey);
       }
+      updateCurrentMasterKey(new MasterKeyData(masterKey,
+          createSecretKey(masterKey.getBytes().array())));
     }
   }
 
@@ -128,7 +190,7 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
       LOG.debug("Removing application attempts NMToken keys for application "
           + appId);
       for (ApplicationAttemptId appAttemptId : appAttemptList) {
-        this.oldMasterKeys.remove(appAttemptId);
+        removeAppAttemptKey(appAttemptId);
       }
       appToAppAttemptMap.remove(appId);
     } else {
@@ -164,11 +226,11 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
           + identifier.getApplicationAttemptId().toString());
       if (identifier.getKeyId() == currentMasterKey.getMasterKey()
         .getKeyId()) {
-        oldMasterKeys.put(appAttemptId, currentMasterKey);
+        updateAppAttemptKey(appAttemptId, currentMasterKey);
       } else if (previousMasterKey != null
           && identifier.getKeyId() == previousMasterKey.getMasterKey()
             .getKeyId()) {
-        oldMasterKeys.put(appAttemptId, previousMasterKey);
+        updateAppAttemptKey(appAttemptId, previousMasterKey);
       } else {
         throw new InvalidToken(
           "Older NMToken should not be used while starting the container.");
@@ -193,4 +255,24 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
   public synchronized NodeId getNodeId() {
     return this.nodeId;
   }
+
+  private void updateAppAttemptKey(ApplicationAttemptId attempt,
+      MasterKeyData key) {
+    this.oldMasterKeys.put(attempt, key);
+    try {
+      stateStore.storeNMTokenApplicationMasterKey(attempt,
+          key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to store master key for application " + attempt, e);
+    }
+  }
+
+  private void removeAppAttemptKey(ApplicationAttemptId attempt) {
+    this.oldMasterKeys.remove(attempt);
+    try {
+      stateStore.removeNMTokenApplicationMasterKey(attempt);
+    } catch (IOException e) {
+      LOG.error("Unable to remove master key for application " + attempt, e);
+    }
+  }
 }

+ 49 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java

@@ -25,14 +25,18 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<TrackerKey, TrackerState> trackerStates;
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
+  private RecoveredNMTokenState nmTokenState;
 
   public NMMemoryStateStoreService() {
     super(NMMemoryStateStoreService.class.getName());
@@ -113,8 +117,12 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
 
   @Override
   protected void initStorage(Configuration conf) {
+    nmTokenState = new RecoveredNMTokenState();
+    nmTokenState.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>();
     trackerStates = new HashMap<TrackerKey, TrackerState>();
     deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
+
   }
 
   @Override
@@ -148,6 +156,47 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
 
+  @Override
+  public RecoveredNMTokenState loadNMTokenState() throws IOException {
+    // return a copy so caller can't modify our state
+    RecoveredNMTokenState result = new RecoveredNMTokenState();
+    result.currentMasterKey = nmTokenState.currentMasterKey;
+    result.previousMasterKey = nmTokenState.previousMasterKey;
+    result.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>(
+            nmTokenState.applicationMasterKeys);
+    return result;
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto());
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto());
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
+      MasterKey key) throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.applicationMasterKeys.put(attempt,
+        new MasterKeyPBImpl(keypb.getProto()));
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
+      throws IOException {
+    nmTokenState.applicationMasterKeys.remove(attempt);
+  }
+
+
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();

+ 81 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -28,6 +29,7 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -37,10 +39,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
+import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -460,4 +465,80 @@ public class TestNMLeveldbStateStoreService {
     state = stateStore.loadDeletionServiceState();
     assertTrue(state.getTasks().isEmpty());
   }
+
+  @Test
+  public void testNMTokenStorage() throws IOException {
+    // test empty when no state
+    RecoveredNMTokenState state = stateStore.loadNMTokenState();
+    assertNull(state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a master key and verify recovered
+    NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
+    MasterKey currentKey = secretMgr.generateKey();
+    stateStore.storeNMTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a previous key and verify recovered
+    MasterKey prevKey = secretMgr.generateKey();
+    stateStore.storeNMTokenPreviousMasterKey(prevKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a few application keys and verify recovered
+    ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(1, 1), 1);
+    MasterKey attemptKey1 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt1, attemptKey1);
+    ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(2, 3), 4);
+    MasterKey attemptKey2 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
+        state.getApplicationMasterKeys();
+    assertEquals(2, loadedAppKeys.size());
+    assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
+    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
+
+    // add/update/remove keys and verify recovered
+    ApplicationAttemptId attempt3 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(5, 6), 7);
+    MasterKey attemptKey3 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt3, attemptKey3);
+    stateStore.removeNMTokenApplicationMasterKey(attempt1);
+    attemptKey2 = prevKey;
+    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
+    prevKey = currentKey;
+    stateStore.storeNMTokenPreviousMasterKey(prevKey);
+    currentKey = secretMgr.generateKey();
+    stateStore.storeNMTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    loadedAppKeys = state.getApplicationMasterKeys();
+    assertEquals(2, loadedAppKeys.size());
+    assertNull(loadedAppKeys.get(attempt1));
+    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
+    assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
+  }
+
+  private static class NMTokenSecretManagerForTest extends
+      BaseNMTokenSecretManager {
+    public MasterKey generateKey() {
+      return createNewMasterKey().getMasterKey();
+    }
+  }
 }

+ 154 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/security/TestNMTokenSecretManagerInNM.java

@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Test;
+
+public class TestNMTokenSecretManagerInNM {
+
+  @Test
+  public void testRecovery() throws IOException {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    final NodeId nodeId = NodeId.newInstance("somehost", 1234);
+    final ApplicationAttemptId attempt1 =
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1);
+    final ApplicationAttemptId attempt2 =
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(2, 2), 2);
+    NMTokenKeyGeneratorForTest keygen = new NMTokenKeyGeneratorForTest();
+    NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    NMTokenSecretManagerInNM secretMgr =
+        new NMTokenSecretManagerInNM(stateStore);
+    secretMgr.setNodeId(nodeId);
+    MasterKey currentKey = keygen.generateKey();
+    secretMgr.setMasterKey(currentKey);
+    NMTokenIdentifier attemptToken1 =
+        getNMTokenId(secretMgr.createNMToken(attempt1, nodeId, "user1"));
+    NMTokenIdentifier attemptToken2 =
+        getNMTokenId(secretMgr.createNMToken(attempt2, nodeId, "user2"));
+    secretMgr.appAttemptStartContainer(attemptToken1);
+    secretMgr.appAttemptStartContainer(attemptToken2);
+    assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
+    assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt2));
+    assertNotNull(secretMgr.retrievePassword(attemptToken1));
+    assertNotNull(secretMgr.retrievePassword(attemptToken2));
+
+    // restart and verify key is still there and token still valid
+    secretMgr = new NMTokenSecretManagerInNM(stateStore);
+    secretMgr.recover(stateStore.loadNMTokenState());
+    secretMgr.setNodeId(nodeId);
+    assertEquals(currentKey, secretMgr.getCurrentKey());
+    assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
+    assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt2));
+    assertNotNull(secretMgr.retrievePassword(attemptToken1));
+    assertNotNull(secretMgr.retrievePassword(attemptToken2));
+
+    // roll master key and remove an app
+    currentKey = keygen.generateKey();
+    secretMgr.setMasterKey(currentKey);
+    secretMgr.appFinished(attempt1.getApplicationId());
+
+    // restart and verify attempt1 key is still valid due to prev key persist
+    secretMgr = new NMTokenSecretManagerInNM(stateStore);
+    secretMgr.recover(stateStore.loadNMTokenState());
+    secretMgr.setNodeId(nodeId);
+    assertEquals(currentKey, secretMgr.getCurrentKey());
+    assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
+    assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt2));
+    assertNotNull(secretMgr.retrievePassword(attemptToken1));
+    assertNotNull(secretMgr.retrievePassword(attemptToken2));
+
+    // roll master key again, restart, and verify attempt1 key is bad but
+    // attempt2 is still good due to app key persist
+    currentKey = keygen.generateKey();
+    secretMgr.setMasterKey(currentKey);
+    secretMgr = new NMTokenSecretManagerInNM(stateStore);
+    secretMgr.recover(stateStore.loadNMTokenState());
+    secretMgr.setNodeId(nodeId);
+    assertEquals(currentKey, secretMgr.getCurrentKey());
+    assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
+    assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt2));
+    try {
+      secretMgr.retrievePassword(attemptToken1);
+      fail("attempt token should not still be valid");
+    } catch (InvalidToken e) {
+      // expected
+    }
+    assertNotNull(secretMgr.retrievePassword(attemptToken2));
+
+    // remove last attempt, restart, verify both tokens are now bad
+    secretMgr.appFinished(attempt2.getApplicationId());
+    secretMgr = new NMTokenSecretManagerInNM(stateStore);
+    secretMgr.recover(stateStore.loadNMTokenState());
+    secretMgr.setNodeId(nodeId);
+    assertEquals(currentKey, secretMgr.getCurrentKey());
+    assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
+    assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt2));
+    try {
+      secretMgr.retrievePassword(attemptToken1);
+      fail("attempt token should not still be valid");
+    } catch (InvalidToken e) {
+      // expected
+    }
+    try {
+      secretMgr.retrievePassword(attemptToken2);
+      fail("attempt token should not still be valid");
+    } catch (InvalidToken e) {
+      // expected
+    }
+
+    stateStore.close();
+  }
+
+  private NMTokenIdentifier getNMTokenId(
+      org.apache.hadoop.yarn.api.records.Token token) throws IOException {
+    Token<NMTokenIdentifier> convertedToken =
+        ConverterUtils.convertFromYarn(token, (Text) null);
+    return convertedToken.decodeIdentifier();
+  }
+
+  private static class NMTokenKeyGeneratorForTest extends
+      BaseNMTokenSecretManager {
+    public MasterKey generateKey() {
+      return createNewMasterKey().getMasterKey();
+    }
+  }
+}