Sfoglia il codice sorgente

YARN-10348. Allow RM to always cancel tokens after app completes. Contributed by
Jim Brennan.

(cherry picked from commit 09f1547697d0aa51380a0351df6d77f54af074a0)

Eric Badger 4 anni fa
parent
commit
4cf5c282d0

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -725,6 +725,9 @@ public class YarnConfiguration extends Configuration {
       RM_PREFIX + "delegation-token.max-conf-size-bytes";
   public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
       12800;
+  public static final String RM_DELEGATION_TOKEN_ALWAYS_CANCEL =
+      RM_PREFIX + "delegation-token.always-cancel";
+  public static final boolean DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL = false;
 
   public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
   public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -784,6 +784,16 @@
     <value>12800</value>
   </property>
 
+  <property>
+    <description>If true, ResourceManager will always try to cancel delegation
+      tokens after the application completes, even if the client sets
+      shouldCancelAtEnd false.  References to delegation tokens are tracked,
+      so they will not be canceled until all sub-tasks are done using them.
+    </description>
+    <name>yarn.resourcemanager.delegation-token.always-cancel</name>
+    <value>false</value>
+  </property>
+
   <property>
   <description>If true, ResourceManager will have proxy-user privileges.
     Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -110,6 +110,7 @@ public class DelegationTokenRenewer extends AbstractService {
   private volatile boolean isServiceStarted;
   private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
   
+  private boolean alwaysCancelDelegationTokens;
   private boolean tokenKeepAliveEnabled;
   private boolean hasProxyUserPrivileges;
   private long credentialsValidTimeRemaining;
@@ -126,6 +127,9 @@ public class DelegationTokenRenewer extends AbstractService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    this.alwaysCancelDelegationTokens =
+        conf.getBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
+            YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL);
     this.hasProxyUserPrivileges =
         conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
           YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
@@ -239,7 +243,7 @@ public class DelegationTokenRenewer extends AbstractService {
    *
    */
   @VisibleForTesting
-  protected static class DelegationTokenToRenew {
+  protected class DelegationTokenToRenew {
     public final Token<?> token;
     public final Collection<ApplicationId> referringAppIds;
     public final Configuration conf;
@@ -269,7 +273,7 @@ public class DelegationTokenRenewer extends AbstractService {
       this.conf = conf;
       this.expirationDate = expirationDate;
       this.timerTask = null;
-      this.shouldCancelAtEnd = shouldCancelAtEnd;
+      this.shouldCancelAtEnd = shouldCancelAtEnd | alwaysCancelDelegationTokens;
     }
     
     public void setTimerTask(RenewalTimerTask tTask) {

+ 72 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

@@ -201,6 +201,8 @@ public class TestDelegationTokenRenewer {
     counter = new AtomicInteger(0);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
+    conf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
+        false);
     UserGroupInformation.setConfiguration(conf);
     eventQueue = new LinkedBlockingQueue<Event>();
     dispatcher = new AsyncDispatcher(eventQueue);
@@ -556,6 +558,76 @@ public class TestDelegationTokenRenewer {
     token1.renew(conf);
   }
   
+  /**
+   * Basic idea of the test:
+   * 1. Verify that YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL = true
+   * overrides shouldCancelAtEnd
+   * 2. register a token for 2 seconds with shouldCancelAtEnd = false
+   * 3. cancel it immediately
+   * 4. check that token was canceled
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  @Test(timeout=60000)
+  public void testDTRenewalWithNoCancelAlwaysCancel() throws Exception {
+    Configuration lconf = new Configuration(conf);
+    lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
+        true);
+
+    DelegationTokenRenewer localDtr =
+        createNewDelegationTokenRenewer(lconf, counter);
+    RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+        new ConcurrentHashMap<ApplicationId, ByteBuffer>());
+    ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(
+        localDtr);
+    when(mockContext.getDispatcher()).thenReturn(dispatcher);
+    InetSocketAddress sockAddr =
+        InetSocketAddress.createUnresolved("localhost", 1234);
+    when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+    localDtr.setRMContext(mockContext);
+    localDtr.init(lconf);
+    localDtr.start();
+
+    MyFS dfs = (MyFS)FileSystem.get(lconf);
+    LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
+
+    Credentials ts = new Credentials();
+    MyToken token1 = dfs.getDelegationToken("user1");
+
+    //to cause this one to be set for renew in 2 secs
+    Renewer.tokenToRenewIn2Sec = token1;
+    LOG.info("token="+token1+" should be renewed for 2 secs");
+
+    String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
+    ts.addToken(new Text(nn1), token1);
+
+    ApplicationId applicationId = BuilderUtils.newApplicationId(0, 1);
+    localDtr.addApplicationAsync(applicationId, ts, false, "user",
+        new Configuration());
+    waitForEventsToGetProcessed(localDtr);
+    localDtr.applicationFinished(applicationId);
+    waitForEventsToGetProcessed(localDtr);
+
+    int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
+    try {
+      Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
+    } catch (InterruptedException e) {}
+    LOG.info("Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
+
+    // counter and the token should still be the old ones
+    assertEquals("renew wasn't called as many times as expected",
+        numberOfExpectedRenewals, Renewer.counter);
+
+    // The token should have been cancelled at this point. Renewal will fail.
+    try {
+      token1.renew(lconf);
+      fail("Renewal of cancelled token should have failed");
+    } catch (InvalidToken ite) {}
+  }
+
   /**
    * Basic idea of the test:
    * 0. Setup token KEEP_ALIVE