Browse Source

YARN-945. Removed setting of AMRMToken's service from ResourceManager and changed client libraries do it all the time and correctly. Contributed by Vinod Kumar Vavilapalli.
svn merge --ignore-ancestry -c 1508232 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1508233 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 years ago
parent
commit
6737f3d3a5
15 changed files with 151 additions and 67 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
  3. 30 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
  4. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
  5. 6 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
  6. 13 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
  7. 3 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
  8. 8 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
  9. 8 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  10. 5 21
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  11. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java
  12. 23 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
  13. 0 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
  14. 14 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
  15. 17 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -756,6 +756,9 @@ Release 2.1.0-beta - 2013-07-02
     YARN-961. Changed ContainerManager to enforce Token auth irrespective of
     YARN-961. Changed ContainerManager to enforce Token auth irrespective of
     security. (Omkar Vinit Joshi via vinodkv)
     security. (Omkar Vinit Joshi via vinodkv)
 
 
+    YARN-945. Removed setting of AMRMToken's service from ResourceManager
+    and changed client libraries do it all the time and correctly. (vinodkv)
+
   BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
   BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
 
 
     YARN-158. Yarn creating package-info.java must not depend on sh.
     YARN-158. Yarn creating package-info.java must not depend on sh.

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java

@@ -43,7 +43,6 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -183,6 +182,9 @@ public class UnmanagedAMLauncher {
     Credentials credentials = new Credentials();
     Credentials credentials = new Credentials();
     Token<AMRMTokenIdentifier> token = 
     Token<AMRMTokenIdentifier> token = 
         rmClient.getAMRMToken(attemptId.getApplicationId());
         rmClient.getAMRMToken(attemptId.getApplicationId());
+    // Service will be empty but that's okay, we are just passing down only
+    // AMRMToken down to the real AM which eventually sets the correct
+    // service-address.
     credentials.addToken(token.getService(), token);
     credentials.addToken(token.getService(), token);
     File tokenFile = File.createTempFile("unmanagedAMRMToken","", 
     File tokenFile = File.createTempFile("unmanagedAMRMToken","", 
         new File(System.getProperty("user.dir")));
         new File(System.getProperty("user.dir")));

+ 30 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java

@@ -24,12 +24,17 @@ import java.net.InetSocketAddress;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 
 
-public class ClientRMProxy<T> extends RMProxy<T>{
+public class ClientRMProxy<T> extends RMProxy<T>  {
 
 
   private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
   private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
 
 
@@ -39,7 +44,24 @@ public class ClientRMProxy<T> extends RMProxy<T>{
     return createRMProxy(conf, protocol, rmAddress);
     return createRMProxy(conf, protocol, rmAddress);
   }
   }
 
 
-  private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
+  private static void setupTokens(InetSocketAddress resourceManagerAddress)
+      throws IOException {
+    // It is assumed for now that the only AMRMToken in AM's UGI is for this
+    // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
+    // default service-address, see YARN-986.
+    for (Token<? extends TokenIdentifier> token : UserGroupInformation
+      .getCurrentUser().getTokens()) {
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        // This token needs to be directly provided to the AMs, so set the
+        // appropriate service-name. We'll need more infrastructure when we
+        // need to set it in HA case.
+        SecurityUtil.setTokenService(token, resourceManagerAddress);
+      }
+    }
+  }
+
+  private static InetSocketAddress getRMAddress(Configuration conf,
+      Class<?> protocol) throws IOException {
     if (protocol == ApplicationClientProtocol.class) {
     if (protocol == ApplicationClientProtocol.class) {
       return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
       return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
           YarnConfiguration.DEFAULT_RM_ADDRESS,
           YarnConfiguration.DEFAULT_RM_ADDRESS,
@@ -50,10 +72,12 @@ public class ClientRMProxy<T> extends RMProxy<T>{
           YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
           YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
           YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
           YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
     } else if (protocol == ApplicationMasterProtocol.class) {
     } else if (protocol == ApplicationMasterProtocol.class) {
-      return conf.getSocketAddr(
-          YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+      InetSocketAddress serviceAddr =
+          conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+      setupTokens(serviceAddr);
+      return serviceAddr;
     } else {
     } else {
       String message = "Unsupported protocol found when creating the proxy " +
       String message = "Unsupported protocol found when creating the proxy " +
           "connection to ResourceManager: " +
           "connection to ResourceManager: " +

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java

@@ -165,7 +165,7 @@ public abstract class YarnClient extends AbstractService {
    * @throws IOException
    * @throws IOException
    */
    */
   public abstract org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
   public abstract org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
-  getAMRMToken(ApplicationId appId) throws YarnException, IOException;
+      getAMRMToken(ApplicationId appId) throws YarnException, IOException;
 
 
   /**
   /**
    * <p>
    * <p>

+ 6 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

@@ -33,10 +33,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
@@ -199,15 +199,11 @@ public class YarnClientImpl extends YarnClient {
 
 
   public org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
   public org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
       getAMRMToken(ApplicationId appId) throws YarnException, IOException {
       getAMRMToken(ApplicationId appId) throws YarnException, IOException {
-    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = null;
-    ApplicationReport report = getApplicationReport(appId);
-    Token token = report.getAMRMToken();
+    Token token = getApplicationReport(appId).getAMRMToken();
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+        null;
     if (token != null) {
     if (token != null) {
-      InetSocketAddress address = getConfig().getSocketAddr(
-          YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-      amrmToken = ConverterUtils.convertFromYarn(token, address);
+      amrmToken = ConverterUtils.convertFromYarn(token, null);
     }
     }
     return amrmToken;
     return amrmToken;
   }
   }

+ 13 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java

@@ -29,23 +29,30 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+import com.google.common.annotations.VisibleForTesting;
 
 
 @InterfaceAudience.Public
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 @InterfaceStability.Evolving
+@SuppressWarnings("unchecked")
 public class RMProxy<T> {
 public class RMProxy<T> {
 
 
   private static final Log LOG = LogFactory.getLog(RMProxy.class);
   private static final Log LOG = LogFactory.getLog(RMProxy.class);
 
 
-  @SuppressWarnings("unchecked")
   public static <T> T createRMProxy(final Configuration conf,
   public static <T> T createRMProxy(final Configuration conf,
       final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
       final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
     RetryPolicy retryPolicy = createRetryPolicy(conf);
     RetryPolicy retryPolicy = createRetryPolicy(conf);
@@ -54,12 +61,11 @@ public class RMProxy<T> {
     return (T) RetryProxy.create(protocol, proxy, retryPolicy);
     return (T) RetryProxy.create(protocol, proxy, retryPolicy);
   }
   }
 
 
-  @SuppressWarnings("unchecked")
-  protected static <T> T getProxy(final Configuration conf,
+  private static <T> T getProxy(final Configuration conf,
       final Class<T> protocol, final InetSocketAddress rmAddress)
       final Class<T> protocol, final InetSocketAddress rmAddress)
       throws IOException {
       throws IOException {
-    return (T) UserGroupInformation.getCurrentUser().doAs(
-      new PrivilegedAction<Object>() {
+    return UserGroupInformation.getCurrentUser().doAs(
+      new PrivilegedAction<T>() {
 
 
         @Override
         @Override
         public T run() {
         public T run() {
@@ -68,6 +74,8 @@ public class RMProxy<T> {
       });
       });
   }
   }
 
 
+  @Private
+  @VisibleForTesting
   public static RetryPolicy createRetryPolicy(Configuration conf) {
   public static RetryPolicy createRetryPolicy(Configuration conf) {
     long rmConnectWaitMS =
     long rmConnectWaitMS =
         conf.getInt(
         conf.getInt(

+ 3 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 
-public class ServerRMProxy<T> extends RMProxy<T>{
+public class ServerRMProxy<T> extends RMProxy<T> {
 
 
   private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
   private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
 
 
@@ -43,8 +43,7 @@ public class ServerRMProxy<T> extends RMProxy<T>{
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
-    }
-    else {
+    } else {
       String message = "Unsupported protocol found when creating the proxy " +
       String message = "Unsupported protocol found when creating the proxy " +
           "connection to ResourceManager: " +
           "connection to ResourceManager: " +
           ((protocol != null) ? protocol.getClass().getName() : "null");
           ((protocol != null) ? protocol.getClass().getName() : "null");
@@ -52,4 +51,4 @@ public class ServerRMProxy<T> extends RMProxy<T>{
       throw new IllegalStateException(message);
       throw new IllegalStateException(message);
     }
     }
   }
   }
-}
+}

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java

@@ -61,6 +61,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
 /**
  * The launch of the AM itself.
  * The launch of the AM itself.
  */
  */
@@ -224,7 +226,7 @@ public class AMLauncher implements Runnable {
     }
     }
 
 
     // Add AMRMToken
     // Add AMRMToken
-    Token<AMRMTokenIdentifier> amrmToken = application.getAMRMToken();
+    Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
     if (amrmToken != null) {
     if (amrmToken != null) {
       credentials.addToken(amrmToken.getService(), amrmToken);
       credentials.addToken(amrmToken.getService(), amrmToken);
     }
     }
@@ -232,6 +234,11 @@ public class AMLauncher implements Runnable {
     credentials.writeTokenStorageToStream(dob);
     credentials.writeTokenStorageToStream(dob);
     container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
     container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
   }
   }
+
+  @VisibleForTesting
+  protected Token<AMRMTokenIdentifier> getAMRMToken() {
+    return application.getAMRMToken();
+  }
   
   
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
   public void run() {
   public void run() {

+ 8 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -43,9 +44,9 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -376,11 +377,16 @@ public abstract class RMStateStore {
   protected abstract void removeApplicationState(ApplicationState appState) 
   protected abstract void removeApplicationState(ApplicationState appState) 
                                                              throws Exception;
                                                              throws Exception;
 
 
+  // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
+  // YARN-986 
+  public static final Text AM_RM_TOKEN_SERVICE = new Text(
+    "AM_RM_TOKEN_SERVICE");
+  
   private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
   private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
     Credentials credentials = new Credentials();
     Credentials credentials = new Credentials();
     Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
     Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
     if(appToken != null){
     if(appToken != null){
-      credentials.addToken(appToken.getService(), appToken);
+      credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
     }
     }
     Token<ClientToAMTokenIdentifier> clientToAMToken =
     Token<ClientToAMTokenIdentifier> clientToAMToken =
         appAttempt.getClientToAMToken();
         appAttempt.getClientToAMToken();

+ 5 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 
 import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
 import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
 
 
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -41,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ExitUtil;
@@ -62,7 +60,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.security.AMRMTokenSelector;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
@@ -684,15 +681,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
             appAttemptTokens.getAllTokens());
             appAttemptTokens.getAllTokens());
     }
     }
 
 
-    InetSocketAddress serviceAddr =
-        conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-    AMRMTokenSelector appTokenSelector = new AMRMTokenSelector();
+    // Only one AMRMToken is stored per-attempt, so this should be fine. Can't
+    // use TokenSelector as service may change - think fail-over.
     this.amrmToken =
     this.amrmToken =
-        appTokenSelector.selectToken(
-          SecurityUtil.buildTokenService(serviceAddr),
-          appAttemptTokens.getAllTokens());
+        (Token<AMRMTokenIdentifier>) appAttemptTokens
+          .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
 
 
     // For now, no need to populate tokens back to AMRMTokenSecretManager,
     // For now, no need to populate tokens back to AMRMTokenSecretManager,
     // because running attempts are rebooted. Later in work-preserve restart,
     // because running attempts are rebooted. Later in work-preserve restart,
@@ -736,18 +729,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       // create AMRMToken
       // create AMRMToken
       AMRMTokenIdentifier id =
       AMRMTokenIdentifier id =
           new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
           new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
-      Token<AMRMTokenIdentifier> amRmToken =
+      appAttempt.amrmToken =
           new Token<AMRMTokenIdentifier>(id,
           new Token<AMRMTokenIdentifier>(id,
             appAttempt.rmContext.getAMRMTokenSecretManager());
             appAttempt.rmContext.getAMRMTokenSecretManager());
-      InetSocketAddress serviceAddr =
-          appAttempt.conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-      // normally the client should set the service after acquiring the
-      // token, but this token is directly provided to the AMs
-      SecurityUtil.setTokenService(amRmToken, serviceAddr);
-
-      appAttempt.amrmToken = amRmToken;
 
 
       // Add the application to the scheduler
       // Add the application to the scheduler
       appAttempt.eventHandler.handle(
       appAttempt.eventHandler.handle(

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java

@@ -18,9 +18,15 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
+import java.net.InetSocketAddress;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -52,6 +58,17 @@ public class MockRMWithCustomAMLauncher extends MockRM {
               ContainerId containerId) {
               ContainerId containerId) {
             return containerManager;
             return containerManager;
           }
           }
+          @Override
+          protected Token<AMRMTokenIdentifier> getAMRMToken() {
+            Token<AMRMTokenIdentifier> amRmToken = super.getAMRMToken();
+            InetSocketAddress serviceAddr =
+                getConfig().getSocketAddr(
+                  YarnConfiguration.RM_SCHEDULER_ADDRESS,
+                  YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+                  YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+            SecurityUtil.setTokenService(amRmToken, serviceAddr);
+            return amRmToken;
+          }
         };
         };
       }
       }
     };
     };

+ 23 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java

@@ -33,7 +33,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -49,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -142,6 +146,19 @@ public class TestAMAuthorization {
     protected ApplicationMasterService createApplicationMasterService() {
     protected ApplicationMasterService createApplicationMasterService() {
       return new ApplicationMasterService(getRMContext(), this.scheduler);
       return new ApplicationMasterService(getRMContext(), this.scheduler);
     }
     }
+
+    @SuppressWarnings("unchecked")
+    public static Token<? extends TokenIdentifier> setupAndReturnAMRMToken(
+        InetSocketAddress rmBindAddress,
+        Collection<Token<? extends TokenIdentifier>> allTokens) {
+      for (Token<? extends TokenIdentifier> token : allTokens) {
+        if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+          SecurityUtil.setTokenService(token, rmBindAddress);
+          return (Token<AMRMTokenIdentifier>) token;
+        }
+      }
+      return null;
+    }
   }
   }
 
 
   @Test
   @Test
@@ -178,8 +195,12 @@ public class TestAMAuthorization {
     UserGroupInformation currentUser = UserGroupInformation
     UserGroupInformation currentUser = UserGroupInformation
         .createRemoteUser(applicationAttemptId.toString());
         .createRemoteUser(applicationAttemptId.toString());
     Credentials credentials = containerManager.getContainerCredentials();
     Credentials credentials = containerManager.getContainerCredentials();
-    currentUser.addCredentials(credentials);
-
+    final InetSocketAddress rmBindAddress =
+        rm.getApplicationMasterService().getBindAddress();
+    Token<? extends TokenIdentifier> amRMToken =
+        MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
+          credentials.getAllTokens());
+    currentUser.addToken(amRMToken);
     ApplicationMasterProtocol client = currentUser
     ApplicationMasterProtocol client = currentUser
         .doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
         .doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
           @Override
           @Override

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;

+ 14 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java

@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 
 
+import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedAction;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -32,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -47,7 +50,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestExceptio
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -274,7 +276,7 @@ public class TestSchedulerUtils {
   public void testValidateResourceBlacklistRequest() throws Exception {
   public void testValidateResourceBlacklistRequest() throws Exception {
 
 
     MyContainerManager containerManager = new MyContainerManager();
     MyContainerManager containerManager = new MyContainerManager();
-    final MockRM rm =
+    final MockRMWithAMS rm =
         new MockRMWithAMS(new YarnConfiguration(), containerManager);
         new MockRMWithAMS(new YarnConfiguration(), containerManager);
     rm.start();
     rm.start();
 
 
@@ -298,13 +300,18 @@ public class TestSchedulerUtils {
     UserGroupInformation currentUser = 
     UserGroupInformation currentUser = 
         UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
         UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
     Credentials credentials = containerManager.getContainerCredentials();
     Credentials credentials = containerManager.getContainerCredentials();
-    currentUser.addCredentials(credentials);
-    ApplicationMasterProtocol client = currentUser
-        .doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+    final InetSocketAddress rmBindAddress =
+        rm.getApplicationMasterService().getBindAddress();
+    Token<? extends TokenIdentifier> amRMToken =
+        MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
+          credentials.getAllTokens());
+    currentUser.addToken(amRMToken);
+    ApplicationMasterProtocol client =
+        currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
           @Override
           @Override
           public ApplicationMasterProtocol run() {
           public ApplicationMasterProtocol run() {
-            return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rm
-                .getApplicationMasterService().getBindAddress(), conf);
+            return (ApplicationMasterProtocol) rpc.getProxy(
+              ApplicationMasterProtocol.class, rmBindAddress, conf);
           }
           }
         });
         });
 
 

+ 17 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 
+import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
@@ -30,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -81,7 +84,7 @@ public class TestAMRMTokens {
   public void testTokenExpiry() throws Exception {
   public void testTokenExpiry() throws Exception {
 
 
     MyContainerManager containerManager = new MyContainerManager();
     MyContainerManager containerManager = new MyContainerManager();
-    final MockRM rm =
+    final MockRMWithAMS rm =
         new MockRMWithAMS(conf, containerManager);
         new MockRMWithAMS(conf, containerManager);
     rm.start();
     rm.start();
 
 
@@ -111,8 +114,12 @@ public class TestAMRMTokens {
           UserGroupInformation
           UserGroupInformation
             .createRemoteUser(applicationAttemptId.toString());
             .createRemoteUser(applicationAttemptId.toString());
       Credentials credentials = containerManager.getContainerCredentials();
       Credentials credentials = containerManager.getContainerCredentials();
-      currentUser.addCredentials(credentials);
-
+      final InetSocketAddress rmBindAddress =
+          rm.getApplicationMasterService().getBindAddress();
+      Token<? extends TokenIdentifier> amRMToken =
+          MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
+            credentials.getAllTokens());
+      currentUser.addToken(amRMToken);
       rmClient = createRMClient(rm, conf, rpc, currentUser);
       rmClient = createRMClient(rm, conf, rpc, currentUser);
 
 
       RegisterApplicationMasterRequest request =
       RegisterApplicationMasterRequest request =
@@ -164,7 +171,7 @@ public class TestAMRMTokens {
   public void testMasterKeyRollOver() throws Exception {
   public void testMasterKeyRollOver() throws Exception {
 
 
     MyContainerManager containerManager = new MyContainerManager();
     MyContainerManager containerManager = new MyContainerManager();
-    final MockRM rm =
+    final MockRMWithAMS rm =
         new MockRMWithAMS(conf, containerManager);
         new MockRMWithAMS(conf, containerManager);
     rm.start();
     rm.start();
 
 
@@ -194,8 +201,12 @@ public class TestAMRMTokens {
           UserGroupInformation
           UserGroupInformation
             .createRemoteUser(applicationAttemptId.toString());
             .createRemoteUser(applicationAttemptId.toString());
       Credentials credentials = containerManager.getContainerCredentials();
       Credentials credentials = containerManager.getContainerCredentials();
-      currentUser.addCredentials(credentials);
-
+      final InetSocketAddress rmBindAddress =
+          rm.getApplicationMasterService().getBindAddress();
+      Token<? extends TokenIdentifier> amRMToken =
+          MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
+            credentials.getAllTokens());
+      currentUser.addToken(amRMToken);
       rmClient = createRMClient(rm, conf, rpc, currentUser);
       rmClient = createRMClient(rm, conf, rpc, currentUser);
 
 
       RegisterApplicationMasterRequest request =
       RegisterApplicationMasterRequest request =