Explorar o código

YARN-6127. Add support for work preserving NM restart when AMRMProxy is enabled. (Botong Huang via asuresh).

Arun Suresh %!s(int64=8) %!d(string=hai) anos
pai
achega
49aa60e50d
Modificáronse 15 ficheiros con 1001 adicións e 81 borrados
  1. 11 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
  2. 152 27
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
  3. 89 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
  4. 30 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
  5. 18 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
  6. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  7. 179 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
  8. 29 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
  9. 86 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
  10. 55 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  11. 116 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
  12. 61 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java
  13. 63 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
  14. 104 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
  15. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

+ 11 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java

@@ -45,12 +45,13 @@ public class AMRMProxyApplicationContextImpl implements
 
   /**
    * Create an instance of the AMRMProxyApplicationContext.
-   * 
-   * @param nmContext
-   * @param conf
-   * @param applicationAttemptId
-   * @param user
-   * @param amrmToken
+   *
+   * @param nmContext NM context
+   * @param conf configuration
+   * @param applicationAttemptId attempt id
+   * @param user user name of the application
+   * @param amrmToken amrmToken issued by RM
+   * @param localToken amrmToken issued by AMRMProxy
    */
   public AMRMProxyApplicationContextImpl(Context nmContext,
       Configuration conf, ApplicationAttemptId applicationAttemptId,
@@ -86,6 +87,8 @@ public class AMRMProxyApplicationContextImpl implements
 
   /**
    * Sets the application's AMRMToken.
+   *
+   * @param amrmToken amrmToken issued by RM
    */
   public synchronized void setAMRMToken(
       Token<AMRMTokenIdentifier> amrmToken) {
@@ -99,6 +102,8 @@ public class AMRMProxyApplicationContextImpl implements
 
   /**
    * Sets the application's AMRMToken.
+   *
+   * @param localToken amrmToken issued by AMRMProxy
    */
   public synchronized void setLocalAMRMToken(
       Token<AMRMTokenIdentifier> localToken) {

+ 152 - 27
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java

@@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
 import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -86,6 +86,10 @@ public class AMRMProxyService extends AbstractService implements
     ApplicationMasterProtocol {
   private static final Logger LOG = LoggerFactory
       .getLogger(AMRMProxyService.class);
+
+  private static final String NMSS_USER_KEY = "user";
+  private static final String NMSS_AMRMTOKEN_KEY = "amrmtoken";
+
   private Server server;
   private final Context nmContext;
   private final AsyncDispatcher dispatcher;
@@ -95,9 +99,9 @@ public class AMRMProxyService extends AbstractService implements
 
   /**
    * Creates an instance of the service.
-   * 
-   * @param nmContext
-   * @param dispatcher
+   *
+   * @param nmContext NM context
+   * @param dispatcher NM dispatcher
    */
   public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
     super(AMRMProxyService.class.getName());
@@ -112,6 +116,14 @@ public class AMRMProxyService extends AbstractService implements
         new ApplicationEventHandler());
   }
 
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    this.secretManager =
+        new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore());
+    this.secretManager.init(conf);
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     LOG.info("Starting AMRMProxyService");
@@ -134,7 +146,6 @@ public class AMRMProxyService extends AbstractService implements
             YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
             YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);
 
-    this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
     this.secretManager.start();
 
     this.server =
@@ -160,6 +171,62 @@ public class AMRMProxyService extends AbstractService implements
     super.serviceStop();
   }
 
+  /**
+   * Recover from NM state store. Called after serviceInit before serviceStart.
+   *
+   * @throws IOException if recover fails
+   */
+  public void recover() throws IOException {
+    LOG.info("Recovering AMRMProxyService");
+
+    RecoveredAMRMProxyState state =
+        this.nmContext.getNMStateStore().loadAMRMProxyState();
+
+    this.secretManager.recover(state);
+
+    LOG.info("Recovering {} running applications for AMRMProxy",
+        state.getAppContexts().size());
+    for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state
+        .getAppContexts().entrySet()) {
+      ApplicationAttemptId attemptId = entry.getKey();
+      LOG.info("Recovering app attempt {}", attemptId);
+
+      // Try recover for the running application attempt
+      try {
+        String user = null;
+        Token<AMRMTokenIdentifier> amrmToken = null;
+        for (Map.Entry<String, byte[]> contextEntry : entry.getValue()
+            .entrySet()) {
+          if (contextEntry.getKey().equals(NMSS_USER_KEY)) {
+            user = new String(contextEntry.getValue(), "UTF-8");
+          } else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) {
+            amrmToken = new Token<>();
+            amrmToken.decodeFromUrlString(
+                new String(contextEntry.getValue(), "UTF-8"));
+          }
+        }
+
+        if (amrmToken == null) {
+          throw new IOException(
+              "No amrmToken found for app attempt " + attemptId);
+        }
+        if (user == null) {
+          throw new IOException("No user found for app attempt " + attemptId);
+        }
+
+        Token<AMRMTokenIdentifier> localToken =
+            this.secretManager.createAndGetAMRMToken(attemptId);
+
+        initializePipeline(attemptId, user, amrmToken, localToken,
+            entry.getValue(), true);
+      } catch (Exception e) {
+        LOG.error("Exception when recovering " + attemptId
+            + ", removing it from NMStateStore and move on", e);
+        this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
+      }
+    }
+  }
+
   /**
    * This is called by the AMs started on this node to register with the RM.
    * This method does the initial authorization and then forwards the request to
@@ -221,8 +288,8 @@ public class AMRMProxyService extends AbstractService implements
    * application request processing pipeline.
    *
    * @param request - encapsulates information for starting an AM
-   * @throws IOException
-   * @throws YarnException
+   * @throws IOException if fails
+   * @throws YarnException if fails
    */
   public void processApplicationStartRequest(StartContainerRequest request)
       throws IOException, YarnException {
@@ -257,22 +324,25 @@ public class AMRMProxyService extends AbstractService implements
     request.getContainerLaunchContext().setTokens(
         ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
 
-    initializePipeline(containerTokenIdentifierForKey.getContainerID()
-        .getApplicationAttemptId(),
-        containerTokenIdentifierForKey.getApplicationSubmitter(),
-        amrmToken, localToken);
+    initializePipeline(appAttemptId,
+        containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken,
+        localToken, null, false);
   }
 
   /**
    * Initializes the request intercepter pipeline for the specified application.
-   * 
-   * @param applicationAttemptId
-   * @param user
-   * @param amrmToken
+   *
+   * @param applicationAttemptId attempt id
+   * @param user user name
+   * @param amrmToken amrmToken issued by RM
+   * @param localToken amrmToken issued by AMRMProxy
+   * @param recoveredDataMap the recovered states for AMRMProxy from NMSS
+   * @param isRecovery whether this is to recover a previously existing pipeline
    */
   protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
       String user, Token<AMRMTokenIdentifier> amrmToken,
-      Token<AMRMTokenIdentifier> localToken) {
+      Token<AMRMTokenIdentifier> localToken,
+      Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
     RequestInterceptorChainWrapper chainWrapper = null;
     synchronized (applPipelineMap) {
       if (applPipelineMap
@@ -288,11 +358,23 @@ public class AMRMProxyService extends AbstractService implements
             && chainWrapperBackup.getApplicationAttemptId() != null
             && !chainWrapperBackup.getApplicationAttemptId()
                 .equals(applicationAttemptId)) {
+          // TODO: revisit in AMRMProxy HA in YARN-6128
           // Remove the existing pipeline
           LOG.info("Remove the previous pipeline for ApplicationId: "
               + applicationAttemptId.toString());
           RequestInterceptorChainWrapper pipeline =
               applPipelineMap.remove(applicationAttemptId.getApplicationId());
+
+          if (!isRecovery && this.nmContext.getNMStateStore() != null) {
+            try {
+              this.nmContext.getNMStateStore()
+                  .removeAMRMProxyAppContext(applicationAttemptId);
+            } catch (IOException e) {
+              LOG.error("Error removing AMRMProxy application context for "
+                  + applicationAttemptId, e);
+            }
+          }
+
           try {
             pipeline.getRootInterceptor().shutdown();
           } catch (Throwable ex) {
@@ -324,7 +406,27 @@ public class AMRMProxyService extends AbstractService implements
           this.createRequestInterceptorChain();
       interceptorChain.init(createApplicationMasterContext(this.nmContext,
           applicationAttemptId, user, amrmToken, localToken));
+      if (isRecovery) {
+        if (recoveredDataMap == null) {
+          throw new YarnRuntimeException(
+              "null recoveredDataMap recieved for recover");
+        }
+        interceptorChain.recover(recoveredDataMap);
+      }
       chainWrapper.init(interceptorChain, applicationAttemptId);
+
+      if (!isRecovery && this.nmContext.getNMStateStore() != null) {
+        try {
+          this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
+              applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8"));
+          this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
+              applicationAttemptId, NMSS_AMRMTOKEN_KEY,
+              amrmToken.encodeToUrlString().getBytes("UTF-8"));
+        } catch (IOException e) {
+          LOG.error("Error storing AMRMProxy application context entry for "
+              + applicationAttemptId, e);
+        }
+      }
     } catch (Exception e) {
       this.applPipelineMap.remove(applicationAttemptId.getApplicationId());
       throw e;
@@ -335,7 +437,7 @@ public class AMRMProxyService extends AbstractService implements
    * Shuts down the request processing pipeline for the specified application
    * attempt id.
    *
-   * @param applicationId
+   * @param applicationId application id
    */
   protected void stopApplication(ApplicationId applicationId) {
     Preconditions.checkArgument(applicationId != null,
@@ -362,6 +464,17 @@ public class AMRMProxyService extends AbstractService implements
             "Failed to shutdown the request processing pipeline for app:"
                 + applicationId, ex);
       }
+
+      // Remove the app context from NMSS after the interceptors are shutdown
+      if (this.nmContext.getNMStateStore() != null) {
+        try {
+          this.nmContext.getNMStateStore()
+              .removeAMRMProxyAppContext(pipeline.getApplicationAttemptId());
+        } catch (IOException e) {
+          LOG.error("Error removing AMRMProxy application context for "
+              + applicationId, e);
+        }
+      }
     }
   }
 
@@ -383,12 +496,24 @@ public class AMRMProxyService extends AbstractService implements
       // Do not propagate this info back to AM
       allocateResponse.setAMRMToken(null);
 
-      org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
+      org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
           new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
               token.getIdentifier().array(), token.getPassword().array(),
               new Text(token.getKind()), new Text(token.getService()));
 
-      context.setAMRMToken(newTokenId);
+      context.setAMRMToken(newToken);
+
+      // Update the AMRMToken in context map in NM state store
+      if (this.nmContext.getNMStateStore() != null) {
+        try {
+          this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
+              context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY,
+              newToken.encodeToUrlString().getBytes("UTF-8"));
+        } catch (IOException e) {
+          LOG.error("Error storing AMRMProxy application context entry for "
+              + context.getApplicationAttemptId(), e);
+        }
+      }
     }
 
     // Check if the local AMRMToken is rolled up and update the context and
@@ -431,7 +556,7 @@ public class AMRMProxyService extends AbstractService implements
 
   /**
    * Gets the Request intercepter chains for all the applications.
-   * 
+   *
    * @return the request intercepter chains.
    */
   protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
@@ -489,7 +614,7 @@ public class AMRMProxyService extends AbstractService implements
   /**
    * Returns the comma separated intercepter class names from the configuration.
    *
-   * @param conf
+   * @param conf configuration
    * @return the intercepter class names as an instance of ArrayList
    */
   private List<String> getInterceptorClassNames(Configuration conf) {
@@ -518,7 +643,7 @@ public class AMRMProxyService extends AbstractService implements
    * processing pipeline.
    *
    * @return the the intercepter wrapper instance
-   * @throws YarnException
+   * @throws YarnException if fails
    */
   private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
       throws YarnException {
@@ -612,8 +737,8 @@ public class AMRMProxyService extends AbstractService implements
     /**
      * Initializes the wrapper with the specified parameters.
      * 
-     * @param rootInterceptor
-     * @param applicationAttemptId
+     * @param rootInterceptor the root request intercepter
+     * @param applicationAttemptId attempt id
      */
     public synchronized void init(RequestInterceptor rootInterceptor,
         ApplicationAttemptId applicationAttemptId) {
@@ -623,7 +748,7 @@ public class AMRMProxyService extends AbstractService implements
 
     /**
      * Gets the root request intercepter.
-     * 
+     *
      * @return the root request intercepter
      */
     public synchronized RequestInterceptor getRootInterceptor() {
@@ -632,7 +757,7 @@ public class AMRMProxyService extends AbstractService implements
 
     /**
      * Gets the application attempt identifier.
-     * 
+     *
      * @return the application attempt identifier
      */
     public synchronized ApplicationAttemptId getApplicationAttemptId() {
@@ -641,7 +766,7 @@ public class AMRMProxyService extends AbstractService implements
 
     /**
      * Gets the application identifier.
-     * 
+     *
      * @return the application identifier
      */
     public synchronized ApplicationId getApplicationId() {

+ 89 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
+import java.io.IOException;
 import java.security.SecureRandom;
 import java.util.HashSet;
 import java.util.Set;
@@ -37,6 +38,9 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -60,17 +64,24 @@ public class AMRMProxyTokenSecretManager extends
   private final Lock writeLock = readWriteLock.writeLock();
 
   private final Timer timer;
-  private final long rollingInterval;
-  private final long activationDelay;
+  private long rollingInterval;
+  private long activationDelay;
+
+  private NMStateStoreService nmStateStore;
 
   private final Set<ApplicationAttemptId> appAttemptSet =
       new HashSet<ApplicationAttemptId>();
 
   /**
    * Create an {@link AMRMProxyTokenSecretManager}.
+   * @param nmStateStoreService NM state store
    */
-  public AMRMProxyTokenSecretManager(Configuration conf) {
+  public AMRMProxyTokenSecretManager(NMStateStoreService nmStateStoreService) {
     this.timer = new Timer();
+    this.nmStateStore = nmStateStoreService;
+  }
+
+  public void init(Configuration conf) {
     this.rollingInterval =
         conf.getLong(
             YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
@@ -94,6 +105,14 @@ public class AMRMProxyTokenSecretManager extends
   public void start() {
     if (this.currentMasterKey == null) {
       this.currentMasterKey = createNewMasterKey();
+      if (this.nmStateStore != null) {
+        try {
+          this.nmStateStore.storeAMRMProxyCurrentMasterKey(
+              this.currentMasterKey.getMasterKey());
+        } catch (IOException e) {
+          LOG.error("Unable to update current master key in state store", e);
+        }
+      }
     }
     this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
         rollingInterval);
@@ -103,6 +122,11 @@ public class AMRMProxyTokenSecretManager extends
     this.timer.cancel();
   }
 
+  @VisibleForTesting
+  public void setNMStateStoreService(NMStateStoreService nmStateStoreService) {
+    this.nmStateStore = nmStateStoreService;
+  }
+
   public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
     this.writeLock.lock();
     try {
@@ -122,11 +146,21 @@ public class AMRMProxyTokenSecretManager extends
   }
 
   @Private
-  void rollMasterKey() {
+  @VisibleForTesting
+  public void rollMasterKey() {
     this.writeLock.lock();
     try {
       LOG.info("Rolling master-key for amrm-tokens");
       this.nextMasterKey = createNewMasterKey();
+      if (this.nmStateStore != null) {
+        try {
+          this.nmStateStore
+              .storeAMRMProxyNextMasterKey(this.nextMasterKey.getMasterKey());
+        } catch (IOException e) {
+          LOG.error("Unable to update next master key in state store", e);
+        }
+      }
+
       this.timer.schedule(new NextKeyActivator(), this.activationDelay);
     } finally {
       this.writeLock.unlock();
@@ -140,6 +174,8 @@ public class AMRMProxyTokenSecretManager extends
     }
   }
 
+  @Private
+  @VisibleForTesting
   public void activateNextMasterKey() {
     this.writeLock.lock();
     try {
@@ -147,6 +183,15 @@ public class AMRMProxyTokenSecretManager extends
           + this.nextMasterKey.getMasterKey().getKeyId());
       this.currentMasterKey = this.nextMasterKey;
       this.nextMasterKey = null;
+      if (this.nmStateStore != null) {
+        try {
+          this.nmStateStore.storeAMRMProxyCurrentMasterKey(
+              this.currentMasterKey.getMasterKey());
+          this.nmStateStore.storeAMRMProxyNextMasterKey(null);
+        } catch (IOException e) {
+          LOG.error("Unable to update current master key in state store", e);
+        }
+      }
     } finally {
       this.writeLock.unlock();
     }
@@ -237,6 +282,17 @@ public class AMRMProxyTokenSecretManager extends
     return new AMRMTokenIdentifier();
   }
 
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getCurrentMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.currentMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   @Private
   @VisibleForTesting
   public MasterKeyData getNextMasterKeyData() {
@@ -262,4 +318,33 @@ public class AMRMProxyTokenSecretManager extends
       this.readLock.unlock();
     }
   }
+
+  /**
+   * Recover secretManager from state store. Called after serviceInit before
+   * serviceStart.
+   *
+   * @param state the state to recover from
+   */
+  public void recover(RecoveredAMRMProxyState state) {
+    if (state != null) {
+      // recover the current master key
+      MasterKey currentKey = state.getCurrentMasterKey();
+      if (currentKey != null) {
+        this.currentMasterKey = new MasterKeyData(currentKey,
+            createSecretKey(currentKey.getBytes().array()));
+      } else {
+        LOG.warn("No current master key recovered from NM StateStore"
+            + " for AMRMProxyTokenSecretManager");
+      }
+
+      // recover the next master key if not null
+      MasterKey nextKey = state.getNextMasterKey();
+      if (nextKey != null) {
+        this.nextMasterKey = new MasterKeyData(nextKey,
+            createSecretKey(nextKey.getBytes().array()));
+        this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+      }
+    }
+  }
+
 }

+ 30 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java

@@ -18,16 +18,17 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.util.Map;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
-
-import java.io.IOException;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 
 /**
  * Implements the RequestInterceptor interface and provides common functionality
@@ -82,6 +83,16 @@ public abstract class AbstractRequestInterceptor implements
     }
   }
 
+  /**
+   * Recover {@link RequestInterceptor} state from store.
+   */
+  @Override
+  public void recover(Map<String, byte[]> recoveredDataMap) {
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.recover(recoveredDataMap);
+    }
+  }
+
   /**
    * Disposes the {@link RequestInterceptor}.
    */
@@ -113,8 +124,8 @@ public abstract class AbstractRequestInterceptor implements
    *
    * @param request ApplicationMaster allocate request
    * @return Distribtued Scheduler Allocate Response
-   * @throws YarnException
-   * @throws IOException
+   * @throws YarnException if fails
+   * @throws IOException if fails
    */
   @Override
   public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
@@ -130,8 +141,8 @@ public abstract class AbstractRequestInterceptor implements
    *
    * @param request ApplicationMaster registration request
    * @return Distributed Scheduler Register Response
-   * @throws YarnException
-   * @throws IOException
+   * @throws YarnException if fails
+   * @throws IOException if fails
    */
   @Override
   public RegisterDistributedSchedulingAMResponse
@@ -141,4 +152,16 @@ public abstract class AbstractRequestInterceptor implements
     return (this.nextInterceptor != null) ? this.nextInterceptor
         .registerApplicationMasterForDistributedScheduling(request) : null;
   }
+
+  /**
+   * A helper method for getting NM state store.
+   *
+   * @return the NMSS instance
+   */
+  public NMStateStoreService getNMStateStore() {
+    if (this.appContext == null || this.appContext.getNMCotext() == null) {
+      return null;
+    }
+    return this.appContext.getNMCotext().getNMStateStore();
+  }
 }

+ 18 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
 
@@ -32,10 +34,24 @@ public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
    * This method is called for initializing the intercepter. This is guaranteed
    * to be called only once in the lifetime of this instance.
    *
-   * @param ctx
+   * @param ctx AMRMProxy application context
    */
   void init(AMRMProxyApplicationContext ctx);
 
+  /**
+   * Recover intercepter state when NM recovery is enabled. AMRMProxy will
+   * recover the data map into
+   * AMRMProxyApplicationContext.getRecoveredDataMap(). All intercepters should
+   * recover state from it.
+   *
+   * For example, registerRequest has to be saved by the last intercepter (i.e.
+   * the one that actually connects to RM), in order to re-register when RM
+   * fails over.
+   *
+   * @param recoveredDataMap states for all intercepters recovered from NMSS
+   */
+  void recover(Map<String, byte[]> recoveredDataMap);
+
   /**
    * This method is called to release the resources held by the intercepter.
    * This will be called when the application pipeline is being destroyed. The
@@ -51,7 +67,7 @@ public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
    * send the messages to the resource manager service and so the last
    * intercepter will not receive this method call.
    *
-   * @param nextInterceptor
+   * @param nextInterceptor the next intercepter to set
    */
   void setNextInterceptor(RequestInterceptor nextInterceptor);
 

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -341,6 +341,10 @@ public class ContainerManagerImpl extends CompositeService implements
       rsrcLocalizationSrvc.recoverLocalizedResources(
           stateStore.loadLocalizationState());
 
+      if (this.amrmProxyEnabled) {
+        this.getAMRMProxyService().recover();
+      }
+
       RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
       for (ContainerManagerApplicationProto proto :
            appsState.getApplications()) {

+ 179 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -24,12 +24,15 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -81,9 +84,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   private static final String DB_NAME = "yarn-nm-state";
   private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
-  
-  private static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(2, 0);
+
+  private static final Version CURRENT_VERSION_INFO = Version.newInstance(3, 0);
 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
@@ -122,6 +124,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
   private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
+  private static final String NEXT_MASTER_KEY_SUFFIX = "NextMasterKey";
   private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
   private static final String NM_TOKENS_CURRENT_MASTER_KEY =
       NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
@@ -136,6 +139,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/";
 
+  private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
+
   private static final byte[] EMPTY_VALUE = new byte[0];
 
   private DB db;
@@ -1125,6 +1130,177 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     return LOG_DELETER_KEY_PREFIX + appId;
   }
 
+  @Override
+  public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException {
+    RecoveredAMRMProxyState result = new RecoveredAMRMProxyState();
+    Set<String> unknownKeys = new HashSet<>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(AMRMPROXY_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.peekNext();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(AMRMPROXY_KEY_PREFIX)) {
+          break;
+        }
+
+        String suffix = key.substring(AMRMPROXY_KEY_PREFIX.length());
+        if (suffix.equals(CURRENT_MASTER_KEY_SUFFIX)) {
+          iter.next();
+          result.setCurrentMasterKey(parseMasterKey(entry.getValue()));
+          LOG.info("Recovered for AMRMProxy: current master key id "
+              + result.getCurrentMasterKey().getKeyId());
+
+        } else if (suffix.equals(NEXT_MASTER_KEY_SUFFIX)) {
+          iter.next();
+          result.setNextMasterKey(parseMasterKey(entry.getValue()));
+          LOG.info("Recovered for AMRMProxy: next master key id "
+              + result.getNextMasterKey().getKeyId());
+
+        } else { // Load AMRMProxy application context map for an app attempt
+          // Parse appAttemptId, also handle the unknown keys
+          int idEndPos;
+          ApplicationAttemptId attemptId;
+          try {
+            idEndPos = key.indexOf('/', AMRMPROXY_KEY_PREFIX.length());
+            if (idEndPos < 0) {
+              throw new IOException(
+                  "Unable to determine attemptId in key: " + key);
+            }
+            attemptId = ApplicationAttemptId.fromString(
+                key.substring(AMRMPROXY_KEY_PREFIX.length(), idEndPos));
+          } catch (Exception e) {
+            // Try to move on for back-forward compatibility
+            LOG.warn("Unknown key " + key + ", remove and move on", e);
+            // Do this because iter.remove() is not supported here
+            unknownKeys.add(key);
+            continue;
+          }
+          // Parse the context map for the appAttemptId
+          Map<String, byte[]> appContext =
+              loadAMRMProxyAppContextMap(iter, key.substring(0, idEndPos + 1));
+          result.getAppContexts().put(attemptId, appContext);
+
+          LOG.info("Recovered for AMRMProxy: " + attemptId + ", map size "
+              + appContext.size());
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+
+    // Delete all unknown keys
+    try {
+      for (String key : unknownKeys) {
+        db.delete(bytes(key));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+
+    return result;
+  }
+
+  private Map<String, byte[]> loadAMRMProxyAppContextMap(LeveldbIterator iter,
+      String keyPrefix) throws IOException {
+    Map<String, byte[]> appContextMap = new HashMap<>();
+    while (iter.hasNext()) {
+      Entry<byte[], byte[]> entry = iter.peekNext();
+      String key = asString(entry.getKey());
+      if (!key.startsWith(keyPrefix)) {
+        break;
+      }
+      iter.next();
+      String suffix = key.substring(keyPrefix.length());
+      byte[] data = entry.getValue();
+      appContextMap.put(suffix, Arrays.copyOf(data, data.length));
+    }
+    return appContextMap;
+  }
+
+  @Override
+  public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException {
+    storeMasterKey(AMRMPROXY_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX, key);
+  }
+
+  @Override
+  public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException {
+    String dbkey = AMRMPROXY_KEY_PREFIX + NEXT_MASTER_KEY_SUFFIX;
+    if (key == null) {
+      // When key is null, delete the entry instead
+      try {
+        db.delete(bytes(dbkey));
+      } catch (DBException e) {
+        throw new IOException(e);
+      }
+      return;
+    }
+    storeMasterKey(dbkey, key);
+  }
+
+  @Override
+  public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt,
+      String key, byte[] data) throws IOException {
+    String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key;
+    try {
+      db.put(bytes(fullkey), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt,
+      String key) throws IOException {
+    String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key;
+    try {
+      db.delete(bytes(fullkey));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
+      throws IOException {
+    Set<String> candidates = new HashSet<>();
+    String keyPrefix = AMRMPROXY_KEY_PREFIX + attempt + "/";
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(keyPrefix));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(keyPrefix)) {
+          break;
+        }
+        // Do this because iter.remove() is not supported here
+        candidates.add(key);
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+
+    // Delete all candidate keys
+    try {
+      for (String key : candidates) {
+        db.delete(bytes(key));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
   @Override
   protected void initStorage(Configuration conf)
       throws IOException {

+ 29 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java

@@ -228,6 +228,35 @@ public class NMNullStateStoreService extends NMStateStoreService {
   public void removeLogDeleter(ApplicationId appId) throws IOException {
   }
 
+  @Override
+  public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException {
+  }
+
+  @Override
+  public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException {
+  }
+
+  @Override
+  public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt,
+      String key, byte[] data) throws IOException {
+  }
+
+  @Override
+  public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt,
+      String key) throws IOException {
+  }
+
+  @Override
+  public void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
+      throws IOException {
+  }
+
   @Override
   protected void initStorage(Configuration conf) throws IOException {
   }

+ 86 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java

@@ -266,6 +266,41 @@ public abstract class NMStateStoreService extends AbstractService {
     }
   }
 
+  /**
+   * Recovered states for AMRMProxy.
+   */
+  public static class RecoveredAMRMProxyState {
+    private MasterKey currentMasterKey;
+    private MasterKey nextMasterKey;
+    // For each app, stores amrmToken, user name, as well as various AMRMProxy
+    // intercepter states
+    private Map<ApplicationAttemptId, Map<String, byte[]>> appContexts;
+
+    public RecoveredAMRMProxyState() {
+      appContexts = new HashMap<>();
+    }
+
+    public MasterKey getCurrentMasterKey() {
+      return currentMasterKey;
+    }
+
+    public MasterKey getNextMasterKey() {
+      return nextMasterKey;
+    }
+
+    public Map<ApplicationAttemptId, Map<String, byte[]>> getAppContexts() {
+      return appContexts;
+    }
+
+    public void setCurrentMasterKey(MasterKey currentKey) {
+      currentMasterKey = currentKey;
+    }
+
+    public void setNextMasterKey(MasterKey nextKey) {
+      nextMasterKey = nextKey;
+    }
+  }
+
   /** Initialize the state storage */
   @Override
   public void serviceInit(Configuration conf) throws IOException {
@@ -601,6 +636,57 @@ public abstract class NMStateStoreService extends AbstractService {
   public abstract void removeLogDeleter(ApplicationId appId)
       throws IOException;
 
+  /**
+   * Load the state of AMRMProxy.
+   * @return recovered state of AMRMProxy
+   * @throws IOException if fails
+   */
+  public abstract RecoveredAMRMProxyState loadAMRMProxyState()
+      throws IOException;
+
+  /**
+   * Record the current AMRMProxyTokenSecretManager master key.
+   * @param key the current master key
+   * @throws IOException if fails
+   */
+  public abstract void storeAMRMProxyCurrentMasterKey(MasterKey key)
+      throws IOException;
+
+  /**
+   * Record the next AMRMProxyTokenSecretManager master key.
+   * @param key the next master key
+   * @throws IOException if fails
+   */
+  public abstract void storeAMRMProxyNextMasterKey(MasterKey key)
+      throws IOException;
+
+  /**
+   * Add a context entry for an application attempt in AMRMProxyService.
+   * @param attempt app attempt ID
+   * @param key key string
+   * @param data state data to store
+   * @throws IOException if fails
+   */
+  public abstract void storeAMRMProxyAppContextEntry(
+      ApplicationAttemptId attempt, String key, byte[] data) throws IOException;
+
+  /**
+   * Remove a context entry for an application attempt in AMRMProxyService.
+   * @param attempt attempt ID
+   * @param key key string
+   * @throws IOException if fails
+   */
+  public abstract void removeAMRMProxyAppContextEntry(
+      ApplicationAttemptId attempt, String key) throws IOException;
+
+  /**
+   * Remove the entire context map for an application attempt in
+   * AMRMProxyService.
+   * @param attempt attempt ID
+   * @throws IOException if fails
+   */
+  public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
+      throws IOException;
 
   protected abstract void initStorage(Configuration conf) throws IOException;
 

+ 55 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -39,6 +39,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -62,12 +63,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@@ -87,17 +89,15 @@ import org.junit.Before;
 public abstract class BaseAMRMProxyTest {
   private static final Log LOG = LogFactory
       .getLog(BaseAMRMProxyTest.class);
-  /**
-   * The AMRMProxyService instance that will be used by all the test cases
-   */
+  // The AMRMProxyService instance that will be used by all the test cases
   private MockAMRMProxyService amrmProxyService;
-  /**
-   * Thread pool used for asynchronous operations
-   */
+
+  // Thread pool used for asynchronous operations
   private static ExecutorService threadpool = Executors
       .newCachedThreadPool();
   private Configuration conf;
   private AsyncDispatcher dispatcher;
+  private Context nmContext;
 
   protected MockAMRMProxyService getAMRMProxyService() {
     Assert.assertNotNull(this.amrmProxyService);
@@ -105,32 +105,40 @@ public abstract class BaseAMRMProxyTest {
   }
 
   @Before
-  public void setUp() {
-    this.conf = new YarnConfiguration();
-    this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+  public void setUp() throws IOException {
+    this.conf = createConfiguration();
+    this.dispatcher = new AsyncDispatcher();
+    this.dispatcher.init(this.conf);
+    this.dispatcher.start();
+    createAndStartAMRMProxyService(this.conf);
+  }
+
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration config = new YarnConfiguration();
+    config.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
     String mockPassThroughInterceptorClass =
         PassThroughRequestInterceptor.class.getName();
 
     // Create a request intercepter pipeline for testing. The last one in the
     // chain will call the mock resource manager. The others in the chain will
     // simply forward it to the next one in the chain
-    this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
-        mockPassThroughInterceptorClass + ","
-            + mockPassThroughInterceptorClass + ","
-            + mockPassThroughInterceptorClass + ","
+    config.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+            + "," + mockPassThroughInterceptorClass + ","
             + MockRequestInterceptor.class.getName());
 
-    this.dispatcher = new AsyncDispatcher();
-    this.dispatcher.init(this.conf);
-    this.dispatcher.start();
-    createAndStartAMRMProxyService(this.conf);
+    config.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    return config;
   }
 
   @After
   public void tearDown() {
-    amrmProxyService.stop();
-    amrmProxyService = null;
+    this.amrmProxyService.stop();
+    this.amrmProxyService = null;
     this.dispatcher.stop();
+    if (this.nmContext.getNMStateStore() != null) {
+      this.nmContext.getNMStateStore().stop();
+    }
   }
 
   protected ExecutorService getThreadPool() {
@@ -141,17 +149,33 @@ public abstract class BaseAMRMProxyTest {
     return this.conf;
   }
 
-  protected void createAndStartAMRMProxyService(Configuration config) {
+  protected AsyncDispatcher getDispatcher() {
+    return this.dispatcher;
+  }
+
+  protected void createAndStartAMRMProxyService(Configuration config)
+      throws IOException {
     // Stop the existing instance first if not null
     if (this.amrmProxyService != null) {
       this.amrmProxyService.stop();
     }
+    if (this.nmContext == null) {
+      this.nmContext = createContext();
+    }
     this.amrmProxyService =
-        new MockAMRMProxyService(new NullContext(), dispatcher);
+        new MockAMRMProxyService(this.nmContext, this.dispatcher);
     this.amrmProxyService.init(config);
+    this.amrmProxyService.recover();
     this.amrmProxyService.start();
   }
 
+  protected Context createContext() {
+    NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(this.conf);
+    stateStore.start();
+    return new NMContext(null, null, null, null, stateStore, false, this.conf);
+  }
+
   /**
    * This helper method will invoke the specified function in parallel for each
    * end point in the specified list using a thread pool and return the
@@ -579,6 +603,13 @@ public abstract class BaseAMRMProxyTest {
       super(nmContext, dispatcher);
     }
 
+    @Override
+    protected void serviceStart() throws Exception {
+      // Override this method and do nothing to avoid the base class from
+      // listening to server end point
+      getSecretManager().start();
+    }
+
     /**
      * This method is used by the test code to initialize the pipeline. In the
      * actual service, the initialization is called by the
@@ -588,7 +619,8 @@ public abstract class BaseAMRMProxyTest {
      * @param user
      */
     public void initApp(ApplicationAttemptId applicationId, String user) {
-      super.initializePipeline(applicationId, user, null, null);
+      super.initializePipeline(applicationId, user,
+          new Token<AMRMTokenIdentifier>(), null, null, false);
     }
 
     public void stopApp(ApplicationId applicationId) {

+ 116 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java

@@ -26,9 +26,12 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -48,6 +52,8 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
   private static final Log LOG = LogFactory
       .getLog(TestAMRMProxyService.class);
 
+  private static MockResourceManagerFacade mockRM;
+
   /**
    * Test if the pipeline is created properly.
    */
@@ -99,9 +105,11 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
 
   /**
    * Tests the case when interceptor pipeline initialization fails.
+   *
+   * @throws IOException
    */
   @Test
-  public void testInterceptorInitFailure() {
+  public void testInterceptorInitFailure() throws IOException {
     Configuration conf = this.getConf();
     // Override with a bad interceptor configuration
     conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
@@ -434,8 +442,8 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
     // Second Attempt
 
     applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2);
-    getAMRMProxyService().initializePipeline(applicationAttemptId, user, null,
-        null);
+    getAMRMProxyService().initializePipeline(applicationAttemptId, user,
+        new Token<AMRMTokenIdentifier>(), null, null, false);
 
     RequestInterceptorChainWrapper chain2 =
         getAMRMProxyService().getPipelines().get(appId);
@@ -559,4 +567,109 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
     Assert.assertEquals(relList.size(),
         containersForReleasedContainerIds.size());
   }
+
+  /**
+   * Test AMRMProxy restart with recovery.
+   */
+  @Test
+  public void testRecovery() throws YarnException, Exception {
+
+    Configuration conf = createConfiguration();
+    // Use the MockRequestInterceptorAcrossRestart instead for the chain
+    conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+        MockRequestInterceptorAcrossRestart.class.getName());
+
+    mockRM = new MockResourceManagerFacade(new YarnConfiguration(conf), 0);
+
+    createAndStartAMRMProxyService(conf);
+
+    int testAppId1 = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId1);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId1),
+        registerResponse.getQueue());
+
+    int testAppId2 = 2;
+    registerResponse = registerApplicationMaster(testAppId2);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId2),
+        registerResponse.getQueue());
+
+    AllocateResponse allocateResponse = allocate(testAppId2);
+    Assert.assertNotNull(allocateResponse);
+
+    // At the time of kill, app1 just registerAM, app2 already did one allocate.
+    // Both application should be recovered
+    createAndStartAMRMProxyService(conf);
+    Assert.assertTrue(getAMRMProxyService().getPipelines().size() == 2);
+
+    allocateResponse = allocate(testAppId1);
+    Assert.assertNotNull(allocateResponse);
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId1, FinalApplicationStatus.SUCCEEDED);
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+    allocateResponse = allocate(testAppId2);
+    Assert.assertNotNull(allocateResponse);
+
+    finshResponse =
+        finishApplicationMaster(testAppId2, FinalApplicationStatus.SUCCEEDED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+    int testAppId3 = 3;
+    try {
+      // Try to finish an application master that is not registered.
+      finishApplicationMaster(testAppId3, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The Mock RM should complain about not knowing the third app");
+    } catch (Throwable ex) {
+    }
+
+    mockRM = null;
+  }
+
+  /**
+   * A mock intercepter implementation that uses the same mockRM instance across
+   * restart.
+   */
+  public static class MockRequestInterceptorAcrossRestart
+      extends AbstractRequestInterceptor {
+
+    public MockRequestInterceptorAcrossRestart() {
+    }
+
+    @Override
+    public void init(AMRMProxyApplicationContext appContext) {
+      super.init(appContext);
+      if (mockRM == null) {
+        throw new RuntimeException("mockRM not initialized yet");
+      }
+    }
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request)
+        throws YarnException, IOException {
+      return mockRM.registerApplicationMaster(request);
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request)
+        throws YarnException, IOException {
+      return mockRM.finishApplicationMaster(request);
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnException, IOException {
+      return mockRM.allocate(request);
+    }
+  }
+
 }

+ 61 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -40,11 +41,19 @@ public class TestAMRMProxyTokenSecretManager {
 
   private YarnConfiguration conf;
   private AMRMProxyTokenSecretManager secretManager;
+  private NMMemoryStateStoreService stateStore;
 
   @Before
   public void setup() {
     conf = new YarnConfiguration();
-    secretManager = new AMRMProxyTokenSecretManager(conf);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+
+    stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+
+    secretManager = new AMRMProxyTokenSecretManager(stateStore);
+    secretManager.init(conf);
     secretManager.start();
   }
 
@@ -53,6 +62,9 @@ public class TestAMRMProxyTokenSecretManager {
     if (secretManager != null) {
       secretManager.stop();
     }
+    if (stateStore != null) {
+      stateStore.stop();
+    }
   }
 
   @Test
@@ -78,4 +90,52 @@ public class TestAMRMProxyTokenSecretManager {
     }
   }
 
+  @Test
+  public void testRecovery() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+    Token<AMRMTokenIdentifier> localToken =
+        secretManager.createAndGetAMRMToken(attemptId);
+
+    AMRMTokenIdentifier identifier = secretManager.createIdentifier();
+    identifier.readFields(new DataInputStream(
+        new ByteArrayInputStream(localToken.getIdentifier())));
+
+    secretManager.retrievePassword(identifier);
+
+    // Generate next master key
+    secretManager.rollMasterKey();
+
+    // Restart and recover
+    secretManager.stop();
+    secretManager = new AMRMProxyTokenSecretManager(stateStore);
+    secretManager.init(conf);
+    secretManager.recover(stateStore.loadAMRMProxyState());
+    secretManager.start();
+    // Recover the app
+    secretManager.createAndGetAMRMToken(attemptId);
+
+    // Current master key should be recovered, and thus pass here
+    secretManager.retrievePassword(identifier);
+
+    // Roll key, current master key will be replaced
+    secretManager.activateNextMasterKey();
+
+    // Restart and recover
+    secretManager.stop();
+    secretManager = new AMRMProxyTokenSecretManager(stateStore);
+    secretManager.init(conf);
+    secretManager.recover(stateStore.loadAMRMProxyState());
+    secretManager.start();
+    // Recover the app
+    secretManager.createAndGetAMRMToken(attemptId);
+
+    try {
+      secretManager.retrievePassword(identifier);
+      Assert.fail("Expect InvalidToken exception because the "
+          + "old master key should have expired");
+    } catch (InvalidToken e) {
+    }
+  }
 }

+ 63 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java

@@ -20,11 +20,10 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -50,6 +49,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   private RecoveredNMTokensState nmTokenState;
   private RecoveredContainerTokensState containerTokenState;
   private Map<ApplicationId, LogDeleterProto> logDeleterState;
+  private RecoveredAMRMProxyState amrmProxyState;
 
   public NMMemoryStateStoreService() {
     super(NMMemoryStateStoreService.class.getName());
@@ -67,6 +67,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     trackerStates = new HashMap<TrackerKey, TrackerState>();
     deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
     logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
+    amrmProxyState = new RecoveredAMRMProxyState();
   }
 
   @Override
@@ -417,6 +418,66 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     logDeleterState.remove(appId);
   }
 
+  @Override
+  public synchronized RecoveredAMRMProxyState loadAMRMProxyState()
+      throws IOException {
+    // return a copy so caller can't modify our state
+    RecoveredAMRMProxyState result = new RecoveredAMRMProxyState();
+    result.setCurrentMasterKey(amrmProxyState.getCurrentMasterKey());
+    result.setNextMasterKey(amrmProxyState.getNextMasterKey());
+    for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry :
+        amrmProxyState.getAppContexts().entrySet()) {
+      result.getAppContexts().put(entry.getKey(),
+          new HashMap<String, byte[]>(entry.getValue()));
+    }
+    return result;
+  }
+
+  @Override
+  public synchronized void storeAMRMProxyCurrentMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    amrmProxyState.setCurrentMasterKey(new MasterKeyPBImpl(keypb.getProto()));
+  }
+
+  @Override
+  public synchronized void storeAMRMProxyNextMasterKey(MasterKey key)
+      throws IOException {
+    if (key == null) {
+      amrmProxyState.setNextMasterKey(null);
+      return;
+    }
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    amrmProxyState.setNextMasterKey(new MasterKeyPBImpl(keypb.getProto()));
+  }
+
+  @Override
+  public synchronized void storeAMRMProxyAppContextEntry(
+      ApplicationAttemptId attempt, String key, byte[] data)
+      throws IOException {
+    Map<String, byte[]> entryMap = amrmProxyState.getAppContexts().get(attempt);
+    if (entryMap == null) {
+      entryMap = new HashMap<>();
+      amrmProxyState.getAppContexts().put(attempt, entryMap);
+    }
+    entryMap.put(key, Arrays.copyOf(data, data.length));
+  }
+
+  @Override
+  public synchronized void removeAMRMProxyAppContextEntry(
+      ApplicationAttemptId attempt, String key) throws IOException {
+    Map<String, byte[]> entryMap = amrmProxyState.getAppContexts().get(attempt);
+    if (entryMap != null) {
+      entryMap.remove(key);
+    }
+  }
+
+  @Override
+  public synchronized void removeAMRMProxyAppContext(
+      ApplicationAttemptId attempt) throws IOException {
+    amrmProxyState.getAppContexts().remove(attempt);
+  }
+
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();

+ 104 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

@@ -20,10 +20,11 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -33,6 +34,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -65,7 +67,9 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
@@ -1016,6 +1020,105 @@ public class TestNMLeveldbStateStoreService {
     assertNull(stateStore.getDB().get(invalidKey));
   }
 
+  @Test
+  public void testAMRMProxyStorage() throws IOException {
+    RecoveredAMRMProxyState state = stateStore.loadAMRMProxyState();
+    assertEquals(state.getCurrentMasterKey(), null);
+    assertEquals(state.getNextMasterKey(), null);
+    assertEquals(state.getAppContexts().size(), 0);
+
+    ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId attemptId1 =
+        ApplicationAttemptId.newInstance(appId1, 1);
+    ApplicationAttemptId attemptId2 =
+        ApplicationAttemptId.newInstance(appId2, 2);
+    String key1 = "key1";
+    String key2 = "key2";
+    byte[] data1 = "data1".getBytes();
+    byte[] data2 = "data2".getBytes();
+
+    AMRMProxyTokenSecretManager secretManager =
+        new AMRMProxyTokenSecretManager(stateStore);
+    secretManager.init(conf);
+    // Generate currentMasterKey
+    secretManager.start();
+
+    try {
+      // Add two applications, each with two data entries
+      stateStore.storeAMRMProxyAppContextEntry(attemptId1, key1, data1);
+      stateStore.storeAMRMProxyAppContextEntry(attemptId2, key1, data1);
+      stateStore.storeAMRMProxyAppContextEntry(attemptId1, key2, data2);
+      stateStore.storeAMRMProxyAppContextEntry(attemptId2, key2, data2);
+
+      // restart state store and verify recovered
+      restartStateStore();
+      secretManager.setNMStateStoreService(stateStore);
+      state = stateStore.loadAMRMProxyState();
+      assertEquals(state.getCurrentMasterKey(),
+          secretManager.getCurrentMasterKeyData().getMasterKey());
+      assertEquals(state.getNextMasterKey(), null);
+      assertEquals(state.getAppContexts().size(), 2);
+      // app1
+      Map<String, byte[]> map = state.getAppContexts().get(attemptId1);
+      assertNotEquals(map, null);
+      assertEquals(map.size(), 2);
+      assertTrue(Arrays.equals(map.get(key1), data1));
+      assertTrue(Arrays.equals(map.get(key2), data2));
+      // app2
+      map = state.getAppContexts().get(attemptId2);
+      assertNotEquals(map, null);
+      assertEquals(map.size(), 2);
+      assertTrue(Arrays.equals(map.get(key1), data1));
+      assertTrue(Arrays.equals(map.get(key2), data2));
+
+      // Generate next master key and remove one entry of app2
+      secretManager.rollMasterKey();
+      stateStore.removeAMRMProxyAppContextEntry(attemptId2, key1);
+
+      // restart state store and verify recovered
+      restartStateStore();
+      secretManager.setNMStateStoreService(stateStore);
+      state = stateStore.loadAMRMProxyState();
+      assertEquals(state.getCurrentMasterKey(),
+          secretManager.getCurrentMasterKeyData().getMasterKey());
+      assertEquals(state.getNextMasterKey(),
+          secretManager.getNextMasterKeyData().getMasterKey());
+      assertEquals(state.getAppContexts().size(), 2);
+      // app1
+      map = state.getAppContexts().get(attemptId1);
+      assertNotEquals(map, null);
+      assertEquals(map.size(), 2);
+      assertTrue(Arrays.equals(map.get(key1), data1));
+      assertTrue(Arrays.equals(map.get(key2), data2));
+      // app2
+      map = state.getAppContexts().get(attemptId2);
+      assertNotEquals(map, null);
+      assertEquals(map.size(), 1);
+      assertTrue(Arrays.equals(map.get(key2), data2));
+
+      // Activate next master key and remove all entries of app1
+      secretManager.activateNextMasterKey();
+      stateStore.removeAMRMProxyAppContext(attemptId1);
+
+      // restart state store and verify recovered
+      restartStateStore();
+      secretManager.setNMStateStoreService(stateStore);
+      state = stateStore.loadAMRMProxyState();
+      assertEquals(state.getCurrentMasterKey(),
+          secretManager.getCurrentMasterKeyData().getMasterKey());
+      assertEquals(state.getNextMasterKey(), null);
+      assertEquals(state.getAppContexts().size(), 1);
+      // app2 only
+      map = state.getAppContexts().get(attemptId2);
+      assertNotEquals(map, null);
+      assertEquals(map.size(), 1);
+      assertTrue(Arrays.equals(map.get(key2), data2));
+    } finally {
+      secretManager.stop();
+    }
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -938,9 +939,10 @@ public class MiniYARNCluster extends CompositeService {
     @Override
     protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
         String user, Token<AMRMTokenIdentifier> amrmToken,
-        Token<AMRMTokenIdentifier> localToken) {
+        Token<AMRMTokenIdentifier> localToken,
+        Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
       super.initializePipeline(applicationAttemptId, user, amrmToken,
-          localToken);
+          localToken, recoveredDataMap, isRecovery);
       RequestInterceptor rt = getPipelines()
           .get(applicationAttemptId.getApplicationId()).getRootInterceptor();
       // The DefaultRequestInterceptor will generally be the last