瀏覽代碼

YARN-5910. Support for multi-cluster delegation tokens. Contributed by Jian He

Jason Lowe 8 年之前
父節點
當前提交
69fa81679f
共有 21 個文件被更改,包括 448 次插入146 次删除
  1. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
  2. 18 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  3. 36 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
  4. 39 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
  5. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
  6. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  7. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  8. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
  9. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  10. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
  11. 17 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  12. 6 24
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  13. 9 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  14. 48 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
  15. 24 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  16. 10 31
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
  17. 17 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
  18. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
  19. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
  20. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
  21. 119 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -1013,4 +1013,6 @@ public interface MRJobConfig {
    * A comma-separated list of properties whose value will be redacted.
    */
   String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
+
+  String MR_JOB_SEND_TOKEN_CONF = "mapreduce.job.send-token-conf";
 }

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1989,4 +1989,22 @@
   <name>mapreduce.job.redacted-properties</name>
   <value></value>
 </property>
+
+<property>
+  <description>
+    This configuration is a regex expression. The list of configurations that
+    match the regex expression will be sent to RM. RM will use these
+    configurations for renewing tokens.
+    This configuration is added for below scenario: User needs to run distcp
+    jobs across two clusters, but the RM does not have necessary hdfs
+    configurations to connect to the remote hdfs cluster. Hence, user relies on
+    this config to send the configurations to RM and RM uses these
+    configurations to renew tokens.
+    For example the following regex expression indicates the minimum required
+    configs for RM to connect to a remote hdfs cluster:
+    dfs.nameservices|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$|^dfs.client.failover.proxy.provider.*$|dfs.namenode.kerberos.principal
+  </description>
+  <name>mapreduce.job.send-token-conf</name>
+  <value></value>
+</property>
 </configuration>

+ 36 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
@@ -499,6 +500,12 @@ public class YARNRunner implements ClientProtocol {
         ContainerLaunchContext.newInstance(localResources, environment,
           vargsFinal, null, securityTokens, acls);
 
+    String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
+    if (regex != null && !regex.isEmpty()) {
+      setTokenRenewerConf(amContainer, conf, regex);
+    }
+
+
     Collection<String> tagsFromConf =
         jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);
 
@@ -576,6 +583,35 @@ public class YARNRunner implements ClientProtocol {
     return appContext;
   }
 
+  private void setTokenRenewerConf(ContainerLaunchContext context,
+      Configuration conf, String regex) throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    Configuration copy = new Configuration(false);
+    copy.clear();
+    int count = 0;
+    for (Map.Entry<String, String> map : conf) {
+      String key = map.getKey();
+      String val = map.getValue();
+      if (key.matches(regex)) {
+        copy.set(key, val);
+        count++;
+      }
+    }
+    copy.write(dob);
+    ByteBuffer appConf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    LOG.info("Send configurations that match regex expression: " + regex
+        + " , total number of configs: " + count + ", total size : " + dob
+        .getLength() + " bytes.");
+    if (LOG.isDebugEnabled()) {
+      for (Iterator<Map.Entry<String, String>> itor = copy.iterator(); itor
+          .hasNext(); ) {
+        Map.Entry<String, String> entry = itor.next();
+        LOG.info(entry.getKey() + " ===> " + entry.getValue());
+      }
+    }
+    context.setTokensConf(appConf);
+  }
+
   @Override
   public void setJobPriority(JobID arg0, String arg1) throws IOException,
       InterruptedException {

+ 39 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java

@@ -47,6 +47,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -99,6 +100,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Layout;
@@ -106,6 +108,7 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
 import org.apache.log4j.WriterAppender;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -675,4 +678,40 @@ public class TestYARNRunner {
     return yarnRunner.createApplicationSubmissionContext(jobConf,
         testWorkDir.toString(), new Credentials());
   }
+
+  // Test configs that match regex expression should be set in
+  // containerLaunchContext
+  @Test
+  public void testSendJobConf() throws IOException {
+    JobConf jobConf = new JobConf();
+    jobConf.set("dfs.nameservices", "mycluster1,mycluster2");
+    jobConf.set("dfs.namenode.rpc-address.mycluster2.nn1", "123.0.0.1");
+    jobConf.set("dfs.namenode.rpc-address.mycluster2.nn2", "123.0.0.2");
+    jobConf.set("dfs.ha.namenodes.mycluster2", "nn1,nn2");
+    jobConf.set("dfs.client.failover.proxy.provider.mycluster2", "provider");
+    jobConf.set("hadoop.tmp.dir", "testconfdir");
+    jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    jobConf.set("mapreduce.job.send-token-conf",
+        "dfs.nameservices|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$"
+            + "|^dfs.client.failover.proxy.provider.*$"
+            + "|dfs.namenode.kerberos.principal");
+    UserGroupInformation.setConfiguration(jobConf);
+
+    YARNRunner yarnRunner = new YARNRunner(jobConf);
+    ApplicationSubmissionContext submissionContext =
+        buildSubmitContext(yarnRunner, jobConf);
+    Configuration confSent = BuilderUtils.parseTokensConf(submissionContext);
+
+    // configs that match regex should be included
+    Assert.assertTrue(confSent.get("dfs.namenode.rpc-address.mycluster2.nn1")
+        .equals("123.0.0.1"));
+    Assert.assertTrue(confSent.get("dfs.namenode.rpc-address.mycluster2.nn2")
+        .equals("123.0.0.2"));
+
+    // configs that aren't matching regex should not be included
+    Assert.assertTrue(confSent.get("hadoop.tmp.dir") == null || !confSent
+        .get("hadoop.tmp.dir").equals("testconfdir"));
+    UserGroupInformation.reset();
+  }
 }

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java

@@ -107,6 +107,22 @@ public abstract class ContainerLaunchContext {
   @Stable
   public abstract void setTokens(ByteBuffer tokens);
 
+  /**
+   * Get the configuration used by RM to renew tokens.
+   * @return The configuration used by RM to renew the tokens.
+   */
+  @Public
+  @Unstable
+  public abstract ByteBuffer getTokensConf();
+
+  /**
+   * Set the configuration used by RM to renew the tokens.
+   * @param tokensConf The configuration used by RM to renew the tokens
+   */
+  @Public
+  @Unstable
+  public abstract void setTokensConf(ByteBuffer tokensConf);
+
   /**
    * Get <code>LocalResource</code> required by the container.
    * @return all <code>LocalResource</code> required by the container

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -526,7 +526,12 @@ public class YarnConfiguration extends Configuration {
      RM_PREFIX + "delegation.token.max-lifetime";
   public static final long RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
     7*24*60*60*1000; // 7 days
-  
+
+  public static final String RM_DELEGATION_TOKEN_MAX_CONF_SIZE =
+      RM_PREFIX + "delegation-token.max-conf-size-bytes";
+  public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
+      12800;
+
   public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
   public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
 

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -548,6 +548,8 @@ message ContainerLaunchContextProto {
   repeated string command = 5;
   repeated ApplicationACLMapProto application_ACLs = 6;
   optional ContainerRetryContextProto container_retry_context = 7;
+  optional bytes tokens_conf = 8;
+
 }
 
 message ContainerStatusProto {

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java

@@ -54,6 +54,7 @@ extends ContainerLaunchContext {
   
   private Map<String, LocalResource> localResources = null;
   private ByteBuffer tokens = null;
+  private ByteBuffer tokensConf = null;
   private Map<String, ByteBuffer> serviceData = null;
   private Map<String, String> environment = null;
   private List<String> commands = null;
@@ -111,6 +112,9 @@ extends ContainerLaunchContext {
     if (this.tokens != null) {
       builder.setTokens(convertToProtoFormat(this.tokens));
     }
+    if (this.tokensConf != null) {
+      builder.setTokensConf(convertToProtoFormat(this.tokensConf));
+    }
     if (this.serviceData != null) {
       addServiceDataToProto();
     }
@@ -267,6 +271,28 @@ extends ContainerLaunchContext {
     this.tokens = tokens;
   }
 
+  @Override
+  public ByteBuffer getTokensConf() {
+    ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.tokensConf != null) {
+      return this.tokensConf;
+    }
+    if (!p.hasTokensConf()) {
+      return null;
+    }
+    this.tokensConf = convertFromProtoFormat(p.getTokensConf());
+    return this.tokensConf;
+  }
+
+  @Override
+  public void setTokensConf(ByteBuffer tokensConf) {
+    maybeInitBuilder();
+    if (tokensConf == null) {
+      builder.clearTokensConf();
+    }
+    this.tokensConf = tokensConf;
+  }
+
   @Override
   public Map<String, ByteBuffer> getServiceData() {
     initServiceData();

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -703,6 +703,16 @@
     <value>30000</value>
   </property>
 
+  <property>
+    <description>Maximum size in bytes for configurations that can be provided
+      by application to RM for delegation token renewal.
+      By experiment, it's roughly 128 bytes per key-value pair.
+      The default value 12800 allows roughly 100 configs, may be less.
+    </description>
+    <name>yarn.resourcemanager.delegation-token.max-conf-size-bytes</name>
+    <value>12800</value>
+  </property>
+
   <property>
   <description>If true, ResourceManager will have proxy-user privileges.
     Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java

@@ -29,8 +29,11 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMCommand;
@@ -62,6 +65,8 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
@@ -496,4 +501,31 @@ public class BuilderUtils {
 
     return response;
   }
+
+  public static Credentials parseCredentials(
+      ApplicationSubmissionContext application) throws IOException {
+    Credentials credentials = new Credentials();
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    ByteBuffer tokens = application.getAMContainerSpec().getTokens();
+    if (tokens != null) {
+      dibb.reset(tokens);
+      credentials.readTokenStorageStream(dibb);
+      tokens.rewind();
+    }
+    return credentials;
+  }
+
+  public static Configuration parseTokensConf(
+      ApplicationSubmissionContext context) throws IOException {
+    ByteBuffer tokensConf = context.getAMContainerSpec().getTokensConf();
+    if (tokensConf == null) {
+      return null;
+    }
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(tokensConf);
+    Configuration appConf = new Configuration(false);
+    appConf.readFields(dibb);
+    tokensConf.rewind();
+    return appConf;
+  }
 }

+ 17 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -614,6 +615,21 @@ public class ClientRMService extends AbstractService implements
       return SubmitApplicationResponse.newInstance();
     }
 
+    ByteBuffer tokenConf =
+        submissionContext.getAMContainerSpec().getTokensConf();
+    if (tokenConf != null) {
+      int maxSize = getConfig()
+          .getInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE,
+              YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES);
+      LOG.info("Using app provided configurations for delegation token renewal,"
+          + " total size = " + tokenConf.capacity());
+      if (tokenConf.capacity() > maxSize) {
+        throw new YarnException(
+            "Exceed " + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE
+                + " = " + maxSize + " bytes, current conf size = "
+                + tokenConf.capacity() + " bytes.");
+      }
+    }
     if (submissionContext.getQueue() == null) {
       submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
     }
@@ -648,8 +664,7 @@ public class ClientRMService extends AbstractService implements
       RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
           "ClientRMService", applicationId, callerContext);
     } catch (YarnException e) {
-      LOG.info("Exception in submitting application with id " +
-          applicationId.getId(), e);
+      LOG.info("Exception in submitting " + applicationId, e);
       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
           e.getMessage(), "ClientRMService",
           "Exception in submitting application", applicationId, callerContext);

+ 6 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -17,18 +17,14 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -299,14 +295,14 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     // constructor.
     RMAppImpl application = createAndPopulateNewRMApp(
         submissionContext, submitTime, user, false, -1);
-    Credentials credentials = null;
     try {
-      credentials = parseCredentials(submissionContext);
       if (UserGroupInformation.isSecurityEnabled()) {
         this.rmContext.getDelegationTokenRenewer()
-            .addApplicationAsync(applicationId, credentials,
+            .addApplicationAsync(applicationId,
+                BuilderUtils.parseCredentials(submissionContext),
                 submissionContext.getCancelTokensWhenComplete(),
-                application.getUser());
+                application.getUser(),
+                BuilderUtils.parseTokensConf(submissionContext));
       } else {
         // Dispatcher is not yet started at this time, so these START events
         // enqueued should be guaranteed to be first processed when dispatcher
@@ -315,11 +311,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
             .handle(new RMAppEvent(applicationId, RMAppEventType.START));
       }
     } catch (Exception e) {
-      LOG.warn("Unable to parse credentials.", e);
+      LOG.warn("Unable to parse credentials for " + applicationId, e);
       // Sending APP_REJECTED is fine, since we assume that the
       // RMApp is in NEW state and thus we haven't yet informed the
       // scheduler about the existence of the application
-      assert application.getState() == RMAppState.NEW;
       this.rmContext.getDispatcher().getEventHandler()
           .handle(new RMAppEvent(applicationId,
               RMAppEventType.APP_REJECTED, e.getMessage()));
@@ -515,20 +510,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     
     return null;
   }
-  
-  protected Credentials parseCredentials(
-      ApplicationSubmissionContext application) throws IOException {
-    Credentials credentials = new Credentials();
-    DataInputByteBuffer dibb = new DataInputByteBuffer();
-    ByteBuffer tokens = application.getAMContainerSpec().getTokens();
-    if (tokens != null) {
-      dibb.reset(tokens);
-      credentials.readTokenStorageStream(dibb);
-      tokens.rewind();
-    }
-    return credentials;
-  }
-  
+
   @Override
   public void recover(RMState state) throws Exception {
     RMStateStore store = rmContext.getStateStore();

+ 9 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -1120,9 +1120,12 @@ public class RMAppImpl implements RMApp, Recoverable {
         try {
           app.rmContext.getDelegationTokenRenewer()
               .addApplicationAsyncDuringRecovery(app.getApplicationId(),
-                  app.parseCredentials(),
+                  BuilderUtils.parseCredentials(app.submissionContext),
                   app.submissionContext.getCancelTokensWhenComplete(),
-                  app.getUser());
+                  app.getUser(),
+                  BuilderUtils.parseTokensConf(app.submissionContext));
+          // set the memory free
+          app.submissionContext.getAMContainerSpec().setTokensConf(null);
         } catch (Exception e) {
           String msg = "Failed to fetch user credentials from application:"
               + e.getMessage();
@@ -1175,6 +1178,8 @@ public class RMAppImpl implements RMApp, Recoverable {
           app.submissionContext, false, app.applicationPriority));
       // send the ATS create Event
       app.sendATSCreateEvent();
+      // Set the memory free after submission context is persisted
+      app.submissionContext.getAMContainerSpec().setTokensConf(null);
     }
   }
 
@@ -1490,6 +1495,8 @@ public class RMAppImpl implements RMApp, Recoverable {
           .applicationFinished(app, finalState);
       app.rmContext.getSystemMetricsPublisher()
           .appFinished(app, finalState, app.finishTime);
+      // set the memory free
+      app.submissionContext.getAMContainerSpec().setTokensConf(null);
     };
   }
 
@@ -1699,18 +1706,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     return this.amReq; 
   }
 
-  protected Credentials parseCredentials() throws IOException {
-    Credentials credentials = new Credentials();
-    DataInputByteBuffer dibb = new DataInputByteBuffer();
-    ByteBuffer tokens = submissionContext.getAMContainerSpec().getTokens();
-    if (tokens != null) {
-      dibb.reset(tokens);
-      credentials.readTokenStorageStream(dibb);
-      tokens.rewind();
-    }
-    return credentials;
-  }
-
   @Override
   public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
     try {

+ 48 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -29,6 +29,7 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Timer;
@@ -379,43 +380,43 @@ public class DelegationTokenRenewer extends AbstractService {
    * @param applicationId added application
    * @param ts tokens
    * @param shouldCancelAtEnd true if tokens should be canceled when the app is
-   * done else false. 
+   * done else false.
    * @param user user
+   * @param tokenConf tokenConf sent by the app-submitter
    */
   public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
-      boolean shouldCancelAtEnd, String user) {
+      boolean shouldCancelAtEnd, String user, Configuration tokenConf) {
     processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
-      applicationId, ts, shouldCancelAtEnd, user));
+      applicationId, ts, shouldCancelAtEnd, user, tokenConf));
   }
 
   /**
    * Asynchronously add application tokens for renewal.
-   *
-   * @param applicationId
+   *  @param applicationId
    *          added application
    * @param ts
    *          tokens
    * @param shouldCancelAtEnd
    *          true if tokens should be canceled when the app is done else false.
-   * @param user
-   *          user
+   * @param user user
+   * @param tokenConf tokenConf sent by the app-submitter
    */
   public void addApplicationAsyncDuringRecovery(ApplicationId applicationId,
-      Credentials ts, boolean shouldCancelAtEnd, String user) {
+      Credentials ts, boolean shouldCancelAtEnd, String user,
+      Configuration tokenConf) {
     processDelegationTokenRenewerEvent(
         new DelegationTokenRenewerAppRecoverEvent(applicationId, ts,
-            shouldCancelAtEnd, user));
+            shouldCancelAtEnd, user, tokenConf));
   }
 
-  /**
-   * Synchronously renew delegation tokens.
-   * @param user user
-   */
+
+  // Only for testing
+  // Synchronously renew delegation tokens.
   public void addApplicationSync(ApplicationId applicationId, Credentials ts,
       boolean shouldCancelAtEnd, String user) throws IOException,
       InterruptedException {
     handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
-      applicationId, ts, shouldCancelAtEnd, user));
+      applicationId, ts, shouldCancelAtEnd, user, new Configuration()));
   }
 
   private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
@@ -455,8 +456,27 @@ public class DelegationTokenRenewer extends AbstractService {
 
         DelegationTokenToRenew dttr = allTokens.get(token);
         if (dttr == null) {
+          Configuration tokenConf;
+          if (evt.tokenConf != null) {
+            // Override conf with app provided conf - this is required in cases
+            // where RM does not have the required conf to communicate with
+            // remote hdfs cluster. The conf is provided by the application
+            // itself.
+            tokenConf = evt.tokenConf;
+            LOG.info("Using app provided token conf for renewal,"
+                + " number of configs = " + tokenConf.size());
+            if (LOG.isDebugEnabled()) {
+              for (Iterator<Map.Entry<String, String>> itor =
+                   tokenConf.iterator(); itor.hasNext(); ) {
+                Map.Entry<String, String> entry = itor.next();
+                LOG.info(entry.getKey() + " ===> " + entry.getValue());
+              }
+            }
+          }  else {
+            tokenConf = getConfig();
+          }
           dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
-              getConfig(), now, shouldCancelAtEnd, evt.getUser());
+              tokenConf, now, shouldCancelAtEnd, evt.getUser());
           try {
             renewToken(dttr);
           } catch (IOException ioe) {
@@ -926,22 +946,22 @@ public class DelegationTokenRenewer extends AbstractService {
   }
 
   static class DelegationTokenRenewerAppSubmitEvent
-      extends
-        AbstractDelegationTokenRenewerAppEvent {
+      extends AbstractDelegationTokenRenewerAppEvent {
     public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
-        Credentials credentails, boolean shouldCancelAtEnd, String user) {
+        Credentials credentails, boolean shouldCancelAtEnd, String user,
+        Configuration tokenConf) {
       super(appId, credentails, shouldCancelAtEnd, user,
-          DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+          DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION, tokenConf);
     }
   }
 
   static class DelegationTokenRenewerAppRecoverEvent
-      extends
-        AbstractDelegationTokenRenewerAppEvent {
+      extends AbstractDelegationTokenRenewerAppEvent {
     public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId,
-        Credentials credentails, boolean shouldCancelAtEnd, String user) {
+        Credentials credentails, boolean shouldCancelAtEnd, String user,
+        Configuration tokenConf) {
       super(appId, credentails, shouldCancelAtEnd, user,
-          DelegationTokenRenewerEventType.RECOVER_APPLICATION);
+          DelegationTokenRenewerEventType.RECOVER_APPLICATION, tokenConf);
     }
   }
 
@@ -949,16 +969,18 @@ public class DelegationTokenRenewer extends AbstractService {
       DelegationTokenRenewerEvent {
 
     private Credentials credentials;
+    private Configuration tokenConf;
     private boolean shouldCancelAtEnd;
     private String user;
 
     public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId,
-        Credentials credentails, boolean shouldCancelAtEnd, String user,
-        DelegationTokenRenewerEventType type) {
+        Credentials credentials, boolean shouldCancelAtEnd, String user,
+        DelegationTokenRenewerEventType type, Configuration tokenConf) {
       super(appId, type);
-      this.credentials = credentails;
+      this.credentials = credentials;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
       this.user = user;
+      this.tokenConf = tokenConf;
     }
 
     public Credentials getCredentials() {

+ 24 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -563,7 +564,7 @@ public class MockRM extends ResourceManager {
     return submitApp(resource, name, user, acls, false, queue,
       super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
-        false, null, 0, null, true, priority, amLabel, null);
+        false, null, 0, null, true, priority, amLabel, null, null);
   }
 
   public RMApp submitApp(Resource resource, String name, String user,
@@ -664,7 +665,17 @@ public class MockRM extends ResourceManager {
     return submitApp(capability, name, user, acls, unmanaged, queue,
       maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
       isAppIdProvided, applicationId, attemptFailuresValidityInterval,
-        logAggregationContext, cancelTokensWhenComplete, priority, "", null);
+        logAggregationContext, cancelTokensWhenComplete, priority, "", null,
+        null);
+  }
+
+  public RMApp submitApp(Credentials cred, ByteBuffer tokensConf)
+      throws Exception {
+    return submitApp(Resource.newInstance(200, 1), "app1", "user", null, false,
+        null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), cred, null, true,
+        false, false, null, 0, null, true, Priority.newInstance(0), null, null,
+        tokensConf);
   }
 
   public RMApp submitApp(Resource capability, String name, String user,
@@ -674,7 +685,8 @@ public class MockRM extends ResourceManager {
       ApplicationId applicationId, long attemptFailuresValidityInterval,
       LogAggregationContext logAggregationContext,
       boolean cancelTokensWhenComplete, Priority priority, String amLabel,
-      Map<ApplicationTimeoutType, Long> applicationTimeouts)
+      Map<ApplicationTimeoutType, Long> applicationTimeouts,
+      ByteBuffer tokensConf)
       throws Exception {
     ApplicationId appId = isAppIdProvided ? applicationId : null;
     ApplicationClientProtocol client = getClientRMService();
@@ -713,6 +725,7 @@ public class MockRM extends ResourceManager {
       ts.writeTokenStorageToStream(dob);
       ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
       clc.setTokens(securityTokens);
+      clc.setTokensConf(tokensConf);
     }
     sub.setAMContainerSpec(clc);
     sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
@@ -729,22 +742,20 @@ public class MockRM extends ResourceManager {
     req.setApplicationSubmissionContext(sub);
     UserGroupInformation fakeUser =
       UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
-    PrivilegedAction<SubmitApplicationResponse> action =
-      new PrivilegedAction<SubmitApplicationResponse>() {
+    PrivilegedExceptionAction<SubmitApplicationResponse> action =
+      new PrivilegedExceptionAction<SubmitApplicationResponse>() {
       ApplicationClientProtocol client;
       SubmitApplicationRequest req;
       @Override
-      public SubmitApplicationResponse run() {
+      public SubmitApplicationResponse run() throws IOException, YarnException {
         try {
           return client.submitApplication(req);
-        } catch (YarnException e) {
-          e.printStackTrace();
-        } catch (IOException e) {
+        } catch (YarnException | IOException e) {
           e.printStackTrace();
+          throw  e;
         }
-        return null;
       }
-      PrivilegedAction<SubmitApplicationResponse> setClientReq(
+      PrivilegedExceptionAction<SubmitApplicationResponse> setClientReq(
         ApplicationClientProtocol client, SubmitApplicationRequest req) {
         this.client = client;
         this.req = req;
@@ -1224,6 +1235,7 @@ public class MockRM extends ResourceManager {
         null, false, null,
         super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
             YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
-        false, false, null, 0, null, true, priority, null, applicationTimeouts);
+        false, false, null, 0, null, true, priority, null, applicationTimeouts,
+        null);
   }
 }

+ 10 - 31
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java

@@ -38,9 +38,11 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -259,6 +261,7 @@ public class TestAppManager{
   public void tearDown() {
     setAppEventType(RMAppEventType.KILL);
     ((Service)rmContext.getDispatcher()).stop();
+    UserGroupInformation.reset();
   }
 
   @Test
@@ -311,13 +314,15 @@ public class TestAppManager{
             ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
     sub.setAMContainerResourceRequest(resReg);
     req.setApplicationSubmissionContext(sub);
+    sub.setAMContainerSpec(mock(ContainerLaunchContext.class));
     try {
       rmService.submitApplication(req);
     } catch (Exception e) {
+      e.printStackTrace();
       if (e instanceof YarnException) {
         Assert.assertTrue(e.getCause() instanceof AccessControlException);
       } else {
-        Assert.fail("Yarn exception is expected");
+        Assert.fail("Yarn exception is expected : " + e.getMessage());
       }
     } finally {
       mockRM.close();
@@ -543,6 +548,10 @@ public class TestAppManager{
     DataOutputBuffer dob = new DataOutputBuffer();
     ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0,
         dob.getLength());
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
     asContext.getAMContainerSpec().setTokens(securityTokens);
     try {
       appMonitor.submitApplication(asContext, "test");
@@ -564,36 +573,6 @@ public class TestAppManager{
     asContext.getAMContainerSpec().setTokens(null);
   }
 
-  @Test
-  public void testRMAppSubmitWithValidTokens() throws Exception {
-    // Setup valid security tokens
-    DataOutputBuffer dob = new DataOutputBuffer();
-    Credentials credentials = new Credentials();
-    credentials.writeTokenStorageToStream(dob);
-    ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0,
-        dob.getLength());
-    asContext.getAMContainerSpec().setTokens(securityTokens);
-    appMonitor.submitApplication(asContext, "test");
-    RMApp app = rmContext.getRMApps().get(appId);
-    Assert.assertNotNull("app is null", app);
-    Assert.assertEquals("app id doesn't match", appId,
-        app.getApplicationId());
-    Assert.assertEquals("app state doesn't match", RMAppState.NEW,
-        app.getState());
-    verify(metricsPublisher).appACLsUpdated(
-        any(RMApp.class), any(String.class), anyLong());
-
-    // wait for event to be processed
-    int timeoutSecs = 0;
-    while ((getAppEventType() == RMAppEventType.KILL) &&
-        timeoutSecs++ < 20) {
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
-        getAppEventType());
-    asContext.getAMContainerSpec().setTokens(null);
-  }
-
   @Test (timeout = 30000)
   public void testRMAppSubmitMaxAppAttempts() throws Exception {
     int[] globalMaxAppAttempts = new int[] { 10, 1 };

+ 17 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -112,6 +112,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
@@ -1745,32 +1746,28 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     memStore.init(conf);
 
     MockRM rm1 = new TestSecurityMockRM(conf, memStore) {
-      @Override
-      protected RMAppManager createRMAppManager() {
-        return new TestRMAppManager(this.rmContext, this.scheduler,
-          this.masterService, this.applicationACLsManager, conf);
-      }
-
-      class TestRMAppManager extends RMAppManager {
-
-        public TestRMAppManager(RMContext context, YarnScheduler scheduler,
-            ApplicationMasterService masterService,
-            ApplicationACLsManager applicationACLsManager, Configuration conf) {
-          super(context, scheduler, masterService, applicationACLsManager, conf);
-        }
-
-        @Override
-        protected Credentials parseCredentials(
-            ApplicationSubmissionContext application) throws IOException {
-          throw new IOException("Parsing credential error.");
+      class TestDelegationTokenRenewer extends DelegationTokenRenewer {
+        public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
+            boolean shouldCancelAtEnd, String user, Configuration appConf) {
+          throw new RuntimeException("failed to submit app");
         }
       }
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return new TestDelegationTokenRenewer();
+      }
     };
     rm1.start();
-    RMApp app1 =
-        rm1.submitApp(200, "name", "user",
+    RMApp app1 = null;
+    try {
+       app1 = rm1.submitApp(200, "name", "user",
           new HashMap<ApplicationAccessType, String>(), false, "default", -1,
           null, "MAPREDUCE", false);
+      Assert.fail();
+    } catch (Exception e) {
+
+    }
+    app1 = rm1.getRMContext().getRMApps().values().iterator().next();
     rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
     // Check app staet is saved in state store.
     Assert.assertEquals(RMAppState.FAILED, memStore.getState()

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java

@@ -30,6 +30,7 @@ import java.util.Map.Entry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -490,9 +491,10 @@ public class TestRMAppLogAggregationStatus {
 
   private RMApp createRMApp(Configuration conf) {
     ApplicationSubmissionContext submissionContext =
-        ApplicationSubmissionContext.newInstance(appId, "test", "default",
-          Priority.newInstance(0), null, false, true,
-          2, Resource.newInstance(10, 2), "test");
+        ApplicationSubmissionContext
+            .newInstance(appId, "test", "default", Priority.newInstance(0),
+                mock(ContainerLaunchContext.class), false, true, 2,
+                Resource.newInstance(10, 2), "test");
     return new RMAppImpl(this.appId, this.rmContext,
       conf, "test", "test", "default", submissionContext,
       scheduler,

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -268,7 +268,7 @@ public class TestRMAppTransitions {
     // but applicationId is still set for safety
     submissionContext.setApplicationId(applicationId);
     submissionContext.setPriority(Priority.newInstance(0));
-
+    submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
     RMApp application = new RMAppImpl(applicationId, rmContext, conf, name,
         user, queue, submissionContext, scheduler, masterService,
         System.currentTimeMillis(), "YARN", null, mock(ResourceRequest.class));

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -244,10 +245,10 @@ public class FairSchedulerTestBase {
       String queue, String user, Resource amResource) {
     RMContext rmContext = resourceManager.getRMContext();
     ApplicationId appId = attId.getApplicationId();
-    RMApp rmApp = new RMAppImpl(appId, rmContext, conf,
-        null, user, null, ApplicationSubmissionContext.newInstance(appId, null,
-        queue, null, null, false, false, 0, amResource, null), scheduler, null,
-        0, null, null, null);
+    RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null,
+        ApplicationSubmissionContext.newInstance(appId, null, queue, null,
+            mock(ContainerLaunchContext.class), false, false, 0, amResource,
+            null), scheduler, null, 0, null, null, null);
     rmContext.getRMApps().put(appId, rmApp);
     RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);
     resourceManager.getRMContext().getRMApps().get(appId).handle(event);

+ 119 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -392,7 +393,8 @@ public class TestDelegationTokenRenewer {
     // register the tokens for renewal
     ApplicationId applicationId_0 = 
         BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user");
+    delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user",
+        new Configuration());
     waitForEventsToGetProcessed(delegationTokenRenewer);
 
     // first 3 initial renewals + 1 real
@@ -432,7 +434,8 @@ public class TestDelegationTokenRenewer {
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user");
+    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user",
+        new Configuration());
     waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
     waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -468,7 +471,8 @@ public class TestDelegationTokenRenewer {
     
     // register the tokens for renewal
     ApplicationId appId =  BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user");
+    delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user",
+        new Configuration());
     int waitCnt = 20;
     while (waitCnt-- >0) {
       if (!eventQueue.isEmpty()) {
@@ -531,7 +535,8 @@ public class TestDelegationTokenRenewer {
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user");
+    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user",
+        new Configuration());
     waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
     waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -600,7 +605,8 @@ public class TestDelegationTokenRenewer {
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
+    localDtr.addApplicationAsync(applicationId_0, ts, true, "user",
+        new Configuration());
     waitForEventsToGetProcessed(localDtr);
     if (!eventQueue.isEmpty()){
       Event evt = eventQueue.take();
@@ -679,7 +685,8 @@ public class TestDelegationTokenRenewer {
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
+    localDtr.addApplicationAsync(applicationId_0, ts, true, "user",
+        new Configuration());
     localDtr.applicationFinished(applicationId_0);
     waitForEventsToGetProcessed(delegationTokenRenewer);
     //Send another keep alive.
@@ -831,14 +838,16 @@ public class TestDelegationTokenRenewer {
     Thread submitThread = new Thread() {                                       
       @Override                                                                
       public void run() {
-        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user");
+        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user",
+            new Configuration());
       }                                                                        
     };                                                                         
     submitThread.start();                                                      
                                                                                
     // wait till 1st submit blocks, then submit another
     startBarrier.await();                           
-    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user");
+    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user",
+        new Configuration());
     // signal 1st to complete                                                  
     endBarrier.await();                                                        
     submitThread.join(); 
@@ -1273,4 +1282,106 @@ public class TestDelegationTokenRenewer {
       }
     }, 10, 10000);
   }
+
+  // Test DelegationTokenRenewer uses the tokenConf provided by application
+  // for token renewal.
+  @Test
+  public void testRenewTokenUsingTokenConfProvidedByApp() throws Exception{
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+
+    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm.start();
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create a token
+    Text userText1 = new Text("user1");
+    DelegationTokenIdentifier dtId1 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+            userText1);
+    final Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+            "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText1, token1);
+
+    // create token conf for renewal
+    Configuration appConf = new Configuration(false);
+    appConf.set("dfs.nameservices", "mycluster1,mycluster2");
+    appConf.set("dfs.namenode.rpc-address.mycluster2.nn1", "123.0.0.1");
+    appConf.set("dfs.namenode.rpc-address.mycluster2.nn2", "123.0.0.2");
+    appConf.set("dfs.ha.namenodes.mycluster2", "nn1,nn2");
+    appConf.set("dfs.client.failover.proxy.provider.mycluster2", "provider");
+    DataOutputBuffer dob = new DataOutputBuffer();
+    appConf.write(dob);
+    ByteBuffer tokenConf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    final int confSize = appConf.size();
+
+    // submit app
+    RMApp app = rm.submitApp(credentials, tokenConf);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        DelegationTokenToRenew toRenew =
+            rm.getRMContext().getDelegationTokenRenewer().getAllTokens()
+                .get(token1);
+        // check app conf size equals to original size and it does contain
+        // the specific config we added.
+        return toRenew != null && toRenew.conf != null
+            && toRenew.conf.size() == confSize && toRenew.conf
+            .get("dfs.namenode.rpc-address.mycluster2.nn1").equals("123.0.0.1");
+      }
+    }, 200, 10000);
+  }
+
+  // Test if app's token conf exceeds RM_DELEGATION_TOKEN_MAX_CONF_SIZE,
+  // app should fail
+  @Test
+  public void testTokensConfExceedLimit() throws Exception {
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    // limit 100 bytes
+    conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, 100);
+    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm.start();
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create a token
+    Text userText1 = new Text("user1");
+    DelegationTokenIdentifier dtId1 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+            userText1);
+    final Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+            "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText1, token1);
+
+    // create token conf for renewal, total size (512 bytes) > limit (100 bytes)
+    // By experiment, it's roughly 128 bytes per key-value pair.
+    Configuration appConf = new Configuration(false);
+    appConf.clear();
+    appConf.set("dfs.nameservices", "mycluster1,mycluster2"); // 128 bytes
+    appConf.set("dfs.namenode.rpc-address.mycluster2.nn1", "123.0.0.1"); //128 bytes
+    appConf.set("dfs.namenode.rpc-address.mycluster3.nn2", "123.0.0.2"); // 128 bytes
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    appConf.write(dob);
+    ByteBuffer tokenConf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    try {
+      rm.submitApp(credentials, tokenConf);
+      Assert.fail();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue(e.getCause().getMessage()
+          .contains(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE));
+    }
+  }
 }