Ver Fonte

MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is submitting a job (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1464147 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Boudnik há 12 anos atrás
pai
commit
347d9f6b98

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -125,6 +125,9 @@ Release 2.0.4-alpha - UNRELEASED
 
 
   BUG FIXES
   BUG FIXES
 
 
+    MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
+    submitting a job (Daryn Sharp via cos)
+
     MAPREDUCE-5117. Changed MRClientProtocolPBClientImpl to be closeable and thus
     MAPREDUCE-5117. Changed MRClientProtocolPBClientImpl to be closeable and thus
     fix failures in renewal of HistoryServer's delegations tokens. (Siddharth
     fix failures in renewal of HistoryServer's delegations tokens. (Siddharth
     Seth via vinodkv)
     Seth via vinodkv)

+ 0 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java

@@ -138,15 +138,6 @@ import org.apache.hadoop.util.ToolRunner;
 public class JobClient extends CLI {
 public class JobClient extends CLI {
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
-  /* notes that get delegation token was called. Again this is hack for oozie 
-   * to make sure we add history server delegation tokens to the credentials
-   *  for the job. Since the api only allows one delegation token to be returned, 
-   *  we have to add this hack.
-   */
-  private boolean getDelegationTokenCalled = false;
-  /* do we need a HS delegation token for this client */
-  static final String HS_DELEGATION_TOKEN_REQUIRED 
-      = "mapreduce.history.server.delegationtoken.required";
   
   
   static{
   static{
     ConfigUtil.loadResources();
     ConfigUtil.loadResources();
@@ -569,10 +560,6 @@ public class JobClient extends CLI {
     try {
     try {
       conf.setBooleanIfUnset("mapred.mapper.new-api", false);
       conf.setBooleanIfUnset("mapred.mapper.new-api", false);
       conf.setBooleanIfUnset("mapred.reducer.new-api", false);
       conf.setBooleanIfUnset("mapred.reducer.new-api", false);
-      if (getDelegationTokenCalled) {
-        conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
-        getDelegationTokenCalled = false;
-      }
       Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
       Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
         @Override
         @Override
         public Job run() throws IOException, ClassNotFoundException, 
         public Job run() throws IOException, ClassNotFoundException, 
@@ -1171,7 +1158,6 @@ public class JobClient extends CLI {
    */
    */
   public Token<DelegationTokenIdentifier> 
   public Token<DelegationTokenIdentifier> 
     getDelegationToken(final Text renewer) throws IOException, InterruptedException {
     getDelegationToken(final Text renewer) throws IOException, InterruptedException {
-    getDelegationTokenCalled = true;
     return clientUgi.doAs(new 
     return clientUgi.doAs(new 
         PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
         PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
       public Token<DelegationTokenIdentifier> run() throws IOException, 
       public Token<DelegationTokenIdentifier> run() throws IOException, 

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -87,6 +88,10 @@ public class ResourceMgrDelegate extends YarnClientImpl {
     return oldMetrics;
     return oldMetrics;
   }
   }
 
 
+  InetSocketAddress getConnectAddress() {
+    return rmAddress;
+  }
+  
   @SuppressWarnings("rawtypes")
   @SuppressWarnings("rawtypes")
   public Token getDelegationToken(Text renewer) throws IOException,
   public Token getDelegationToken(Text renewer) throws IOException,
       InterruptedException {
       InterruptedException {

+ 27 - 21
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequ
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
@@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMTokenSelector;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
@@ -90,7 +92,7 @@ import com.google.common.annotations.VisibleForTesting;
 /**
 /**
  * This class enables the current JobClient (0.22 hadoop) to run on YARN.
  * This class enables the current JobClient (0.22 hadoop) to run on YARN.
  */
  */
-@SuppressWarnings({ "rawtypes", "unchecked" })
+@SuppressWarnings("unchecked")
 public class YARNRunner implements ClientProtocol {
 public class YARNRunner implements ClientProtocol {
 
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
@@ -101,14 +103,6 @@ public class YARNRunner implements ClientProtocol {
   private Configuration conf;
   private Configuration conf;
   private final FileContext defaultFileContext;
   private final FileContext defaultFileContext;
   
   
-  /* usually is false unless the jobclient get delegation token is 
-   *  called. This is a hack wherein we do return a token from RM 
-   *  on getDelegationtoken but due to the restricted api on jobclient
-   *  we just add a job history DT token when submitting a job.
-   */
-  private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED = 
-      false;
-  
   /**
   /**
    * Yarn runner incapsulates the client interface of
    * Yarn runner incapsulates the client interface of
    * yarn
    * yarn
@@ -185,6 +179,28 @@ public class YARNRunner implements ClientProtocol {
     return resMgrDelegate.getClusterMetrics();
     return resMgrDelegate.getClusterMetrics();
   }
   }
 
 
+  @VisibleForTesting
+  void addHistoyToken(Credentials ts) throws IOException, InterruptedException {
+    /* check if we have a hsproxy, if not, no need */
+    MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
+    if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
+      /*
+       * note that get delegation token was called. Again this is hack for oozie
+       * to make sure we add history server delegation tokens to the credentials
+       */
+      RMTokenSelector tokenSelector = new RMTokenSelector();
+      Text service = SecurityUtil.buildTokenService(resMgrDelegate
+          .getConnectAddress());
+      if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
+        Text hsService = SecurityUtil.buildTokenService(hsProxy
+            .getConnectAddress());
+        if (ts.getToken(hsService) == null) {
+          ts.addToken(hsService, getDelegationTokenFromHS(hsProxy));
+        }
+      }
+    }
+  }
+  
   @VisibleForTesting
   @VisibleForTesting
   Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
   Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
       throws IOException, InterruptedException {
       throws IOException, InterruptedException {
@@ -263,18 +279,8 @@ public class YARNRunner implements ClientProtocol {
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
   throws IOException, InterruptedException {
   throws IOException, InterruptedException {
     
     
-    /* check if we have a hsproxy, if not, no need */
-    MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
-    if (hsProxy != null) {
-      // JobClient will set this flag if getDelegationToken is called, if so, get
-      // the delegation tokens for the HistoryServer also.
-      if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, 
-          DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
-        Token hsDT = getDelegationTokenFromHS(hsProxy);
-        ts.addToken(hsDT.getService(), hsDT);
-      }
-    }
-
+    addHistoyToken(ts);
+    
     // Upload only in security mode: TODO
     // Upload only in security mode: TODO
     Path applicationTokensFile =
     Path applicationTokensFile =
         new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
         new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);

+ 102 - 17
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java

@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
 
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
 
 
@@ -30,6 +32,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.List;
@@ -39,28 +42,24 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.ClientCache;
-import org.apache.hadoop.mapred.ClientServiceDelegate;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Master;
-import org.apache.hadoop.mapred.ResourceMgrDelegate;
-import org.apache.hadoop.mapred.YARNRunner;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -69,21 +68,27 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Logger;
@@ -146,7 +151,7 @@ public class TestYARNRunner extends TestCase {
    }
    }
 
 
 
 
-  @Test
+  @Test(timeout=20000)
   public void testJobKill() throws Exception {
   public void testJobKill() throws Exception {
     clientDelegate = mock(ClientServiceDelegate.class);
     clientDelegate = mock(ClientServiceDelegate.class);
     when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
     when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
@@ -171,7 +176,7 @@ public class TestYARNRunner extends TestCase {
     verify(clientDelegate).killJob(jobId);
     verify(clientDelegate).killJob(jobId);
   }
   }
 
 
-  @Test
+  @Test(timeout=20000)
   public void testJobSubmissionFailure() throws Exception {
   public void testJobSubmissionFailure() throws Exception {
     when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
     when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
     thenReturn(appId);
     thenReturn(appId);
@@ -193,7 +198,7 @@ public class TestYARNRunner extends TestCase {
     }
     }
   }
   }
 
 
-  @Test
+  @Test(timeout=20000)
   public void testResourceMgrDelegate() throws Exception {
   public void testResourceMgrDelegate() throws Exception {
     /* we not want a mock of resource mgr delegate */
     /* we not want a mock of resource mgr delegate */
     final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
     final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
@@ -259,8 +264,88 @@ public class TestYARNRunner extends TestCase {
     delegate.getQueueAclsForCurrentUser();
     delegate.getQueueAclsForCurrentUser();
     verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
     verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
   }
   }
-  
-  @Test
+
+  @Test(timeout=20000)
+  public void testGetHSDelegationToken() throws Exception {
+    try {
+      Configuration conf = new Configuration();
+
+      // Setup mock service
+      InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444);
+      Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress);
+
+      InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200);
+      Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress);
+
+      // Setup mock rm token
+      RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier(
+          new Text("owner"), new Text("renewer"), new Text("real"));
+      Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
+          new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice);
+      token.setKind(RMDelegationTokenIdentifier.KIND_NAME);
+
+      // Setup mock history token
+      DelegationToken historyToken = BuilderUtils.newDelegationToken(
+          new byte[0], MRDelegationTokenIdentifier.KIND_NAME.toString(),
+          new byte[0], hsTokenSevice.toString());
+      GetDelegationTokenResponse getDtResponse = Records
+          .newRecord(GetDelegationTokenResponse.class);
+      getDtResponse.setDelegationToken(historyToken);
+
+      // mock services
+      MRClientProtocol mockHsProxy = mock(MRClientProtocol.class);
+      doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress();
+      doReturn(getDtResponse).when(mockHsProxy).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+      doReturn(mockRmAddress).when(rmDelegate).getConnectAddress();
+
+      ClientCache clientCache = mock(ClientCache.class);
+      doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();
+
+      Credentials creds = new Credentials();
+
+      YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache);
+
+      // No HS token if no RM token
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(0)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      // No HS token if RM token, but secirity disabled.
+      creds.addToken(new Text("rmdt"), token);
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(0)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+          "kerberos");
+      UserGroupInformation.setConfiguration(conf);
+      creds = new Credentials();
+
+      // No HS token if no RM token, security enabled
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(0)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      // HS token if RM token present, security enabled
+      creds.addToken(new Text("rmdt"), token);
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(1)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      // No additional call to get HS token if RM and HS token present
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(1)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+    } finally {
+      // Back to defaults.
+      UserGroupInformation.setConfiguration(new Configuration());
+    }
+  }
+
+  @Test(timeout=20000)
   public void testHistoryServerToken() throws Exception {
   public void testHistoryServerToken() throws Exception {
     //Set the master principal in the config
     //Set the master principal in the config
     conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
     conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
@@ -303,7 +388,7 @@ public class TestYARNRunner extends TestCase {
         });
         });
   }
   }
 
 
-  @Test
+  @Test(timeout=20000)
   public void testAMAdminCommandOpts() throws Exception {
   public void testAMAdminCommandOpts() throws Exception {
     JobConf jobConf = new JobConf();
     JobConf jobConf = new JobConf();
     
     
@@ -366,7 +451,7 @@ public class TestYARNRunner extends TestCase {
       assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
       assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
     }
     }
   }
   }
-  @Test
+  @Test(timeout=20000)
   public void testWarnCommandOpts() throws Exception {
   public void testWarnCommandOpts() throws Exception {
     Logger logger = Logger.getLogger(YARNRunner.class);
     Logger logger = Logger.getLogger(YARNRunner.class);