Prechádzať zdrojové kódy

YARN-4041. Slow delegation token renewal can severely prolong RM recovery. Contributed by Sunil G

Jason Lowe 9 rokov pred
rodič
commit
d3a34a4f38

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -1076,6 +1076,9 @@ Release 2.7.2 - UNRELEASED
     YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called 
     by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)
 
+    YARN-4041. Slow delegation token renewal can severely prolong RM recovery
+    (Sunil G via jlowe)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

+ 8 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -946,14 +946,16 @@ public class RMAppImpl implements RMApp, Recoverable {
       }
 
       if (UserGroupInformation.isSecurityEnabled()) {
-        // synchronously renew delegation token on recovery.
+        // asynchronously renew delegation token on recovery.
         try {
-          app.rmContext.getDelegationTokenRenewer().addApplicationSync(
-            app.getApplicationId(), app.parseCredentials(),
-            app.submissionContext.getCancelTokensWhenComplete(), app.getUser());
+          app.rmContext.getDelegationTokenRenewer()
+              .addApplicationAsyncDuringRecovery(app.getApplicationId(),
+                  app.parseCredentials(),
+                  app.submissionContext.getCancelTokensWhenComplete(),
+                  app.getUser());
         } catch (Exception e) {
-          String msg = "Failed to renew token for " + app.applicationId
-                  + " on recovery : " + e.getMessage();
+          String msg = "Failed to fetch user credentials from application:"
+              + e.getMessage();
           app.diagnostics.append(msg);
           LOG.error(msg, e);
         }

+ 63 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -387,6 +387,25 @@ public class DelegationTokenRenewer extends AbstractService {
       applicationId, ts, shouldCancelAtEnd, user));
   }
 
+  /**
+   * Asynchronously add application tokens for renewal.
+   *
+   * @param applicationId
+   *          added application
+   * @param ts
+   *          tokens
+   * @param shouldCancelAtEnd
+   *          true if tokens should be canceled when the app is done else false.
+   * @param user
+   *          user
+   */
+  public void addApplicationAsyncDuringRecovery(ApplicationId applicationId,
+      Credentials ts, boolean shouldCancelAtEnd, String user) {
+    processDelegationTokenRenewerEvent(
+        new DelegationTokenRenewerAppRecoverEvent(applicationId, ts,
+            shouldCancelAtEnd, user));
+  }
+
   /**
    * Synchronously renew delegation tokens.
    * @param user user
@@ -398,7 +417,7 @@ public class DelegationTokenRenewer extends AbstractService {
       applicationId, ts, shouldCancelAtEnd, user));
   }
 
-  private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
+  private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
       throws IOException, InterruptedException {
     ApplicationId applicationId = evt.getApplicationId();
     Credentials ts = evt.getCredentials();
@@ -842,6 +861,10 @@ public class DelegationTokenRenewer extends AbstractService {
         DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
             (DelegationTokenRenewerAppSubmitEvent) evt;
         handleDTRenewerAppSubmitEvent(appSubmitEvt);
+      } else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) {
+        DelegationTokenRenewerAppRecoverEvent appRecoverEvt =
+            (DelegationTokenRenewerAppRecoverEvent) evt;
+        handleDTRenewerAppRecoverEvent(appRecoverEvt);
       } else if (evt.getType().equals(
           DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
         DelegationTokenRenewer.this.handleAppFinishEvent(evt);
@@ -876,17 +899,50 @@ public class DelegationTokenRenewer extends AbstractService {
       }
     }
   }
-  
-  static class DelegationTokenRenewerAppSubmitEvent extends
+
+  @SuppressWarnings("unchecked")
+  private void handleDTRenewerAppRecoverEvent(
+      DelegationTokenRenewerAppRecoverEvent event) {
+    try {
+      // Setup tokens for renewal during recovery
+      DelegationTokenRenewer.this.handleAppSubmitEvent(event);
+    } catch (Throwable t) {
+      LOG.warn(
+          "Unable to add the application to the delegation token renewer.", t);
+    }
+  }
+
+  static class DelegationTokenRenewerAppSubmitEvent
+      extends
+        AbstractDelegationTokenRenewerAppEvent {
+    public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
+        Credentials credentails, boolean shouldCancelAtEnd, String user) {
+      super(appId, credentails, shouldCancelAtEnd, user,
+          DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+    }
+  }
+
+  static class DelegationTokenRenewerAppRecoverEvent
+      extends
+        AbstractDelegationTokenRenewerAppEvent {
+    public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId,
+        Credentials credentails, boolean shouldCancelAtEnd, String user) {
+      super(appId, credentails, shouldCancelAtEnd, user,
+          DelegationTokenRenewerEventType.RECOVER_APPLICATION);
+    }
+  }
+
+  static class AbstractDelegationTokenRenewerAppEvent extends
       DelegationTokenRenewerEvent {
 
     private Credentials credentials;
     private boolean shouldCancelAtEnd;
     private String user;
 
-    public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
-        Credentials credentails, boolean shouldCancelAtEnd, String user) {
-      super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+    public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId,
+        Credentials credentails, boolean shouldCancelAtEnd, String user,
+        DelegationTokenRenewerEventType type) {
+      super(appId, type);
       this.credentials = credentails;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
       this.user = user;
@@ -907,6 +963,7 @@ public class DelegationTokenRenewer extends AbstractService {
   
   enum DelegationTokenRenewerEventType {
     VERIFY_AND_START_APPLICATION,
+    RECOVER_APPLICATION,
     FINISH_APPLICATION
   }
   

+ 12 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -1179,24 +1179,24 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
 
     // Need to wait for a while as now token renewal happens on another thread
     // and is asynchronous in nature.
-    waitForTokensToBeRenewed(rm2);
+    waitForTokensToBeRenewed(rm2, tokenSet);
 
     // verify tokens are properly populated back to rm2 DelegationTokenRenewer
     Assert.assertEquals(tokenSet, rm2.getRMContext()
       .getDelegationTokenRenewer().getDelegationTokens());
   }
 
-  private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
-    int waitCnt = 20;
-    boolean atleastOneAppInNEWState = true;
-    while (waitCnt-- > 0 && atleastOneAppInNEWState) {
-      atleastOneAppInNEWState = false;
-      for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
-        if (rmApp.getState() == RMAppState.NEW) {
-          Thread.sleep(1000);
-          atleastOneAppInNEWState = true;
-          break;
-        }
+  private void waitForTokensToBeRenewed(MockRM rm2,
+      HashSet<Token<RMDelegationTokenIdentifier>> tokenSet) throws Exception {
+    // Max wait time to get the token renewal can be kept as 1sec (100 * 10ms)
+    int waitCnt = 100;
+    while (waitCnt-- > 0) {
+      if (tokenSet.equals(rm2.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens())) {
+        // Stop waiting as tokens are populated to DelegationTokenRenewer.
+        break;
+      } else {
+        Thread.sleep(10);
       }
     }
   }