Browse Source

YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token renewal of applications part of a bigger workflow. Contributed by Daryn Sharp.

(cherry picked from commit 9c5911294e0ba71aefe4763731b0e780cde9d0ca)
(cherry picked from commit 1ff3fd33ed6f2ac09c774cc42b0107c5dbd9c19d)
(cherry picked from commit 82c722aae86669325672dd10840447434f15e7fd)
Vinod Kumar Vavilapalli 10 năm trước cách đây
mục cha
commit
752e3da738

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -120,6 +120,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3393. Getting application(s) goes wrong when app finishes before
     starting the attempt. (Zhijie Shen via xgong)
 
+    YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token
+    renewal of applications part of a bigger workflow. (Daryn Sharp via vinodkv)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 84 - 53
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -229,15 +230,16 @@ public class DelegationTokenRenewer extends AbstractService {
   @VisibleForTesting
   protected static class DelegationTokenToRenew {
     public final Token<?> token;
-    public final ApplicationId applicationId;
+    public final Collection<ApplicationId> referringAppIds;
     public final Configuration conf;
     public long expirationDate;
-    public TimerTask timerTask;
+    public RenewalTimerTask timerTask;
     public volatile boolean shouldCancelAtEnd;
     public long maxDate;
     public String user;
 
-    public DelegationTokenToRenew(ApplicationId jId, Token<?> token,
+    public DelegationTokenToRenew(Collection<ApplicationId> applicationIds,
+        Token<?> token,
         Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
         String user) {
       this.token = token;
@@ -251,20 +253,33 @@ public class DelegationTokenRenewer extends AbstractService {
           throw new YarnRuntimeException(e);
         }
       }
-      this.applicationId = jId;
+      this.referringAppIds = Collections.synchronizedSet(
+          new HashSet<ApplicationId>(applicationIds));
       this.conf = conf;
       this.expirationDate = expirationDate;
       this.timerTask = null;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
     }
     
-    public void setTimerTask(TimerTask tTask) {
+    public void setTimerTask(RenewalTimerTask tTask) {
       timerTask = tTask;
     }
-    
+
+    @VisibleForTesting
+    public void cancelTimer() {
+      if (timerTask != null) {
+        timerTask.cancel();
+      }
+    }
+
+    @VisibleForTesting
+    public boolean isTimerCancelled() {
+      return (timerTask != null) && timerTask.cancelled.get();
+    }
+
     @Override
     public String toString() {
-      return token + ";exp=" + expirationDate;
+      return token + ";exp=" + expirationDate + "; apps=" + referringAppIds;
     }
     
     @Override
@@ -416,19 +431,16 @@ public class DelegationTokenRenewer extends AbstractService {
         }
 
         DelegationTokenToRenew dttr = allTokens.get(token);
-        if (dttr != null) {
-          // If any of the jobs sharing the same token doesn't want to cancel
-          // the token, we should not cancel the token.
-          if (!evt.shouldCancelAtEnd) {
-            dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd;
-            LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd
-                + " for token " + dttr.token);
+        if (dttr == null) {
+          dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
+              getConfig(), now, shouldCancelAtEnd, evt.getUser());
+          try {
+            renewToken(dttr);
+          } catch (IOException ioe) {
+            throw new IOException("Failed to renew token: " + dttr.token, ioe);
           }
-          continue;
         }
-
-        tokenList.add(new DelegationTokenToRenew(applicationId, token,
-          getConfig(), now, shouldCancelAtEnd, evt.getUser()));
+        tokenList.add(dttr);
       }
     }
 
@@ -437,21 +449,21 @@ public class DelegationTokenRenewer extends AbstractService {
       // If user provides incorrect token then it should not be added for
       // renewal.
       for (DelegationTokenToRenew dtr : tokenList) {
-        try {
-          renewToken(dtr);
-        } catch (IOException ioe) {
-          throw new IOException("Failed to renew token: " + dtr.token, ioe);
+        DelegationTokenToRenew currentDtr =
+            allTokens.putIfAbsent(dtr.token, dtr);
+        if (currentDtr != null) {
+          // another job beat us
+          currentDtr.referringAppIds.add(applicationId);
+          appTokens.get(applicationId).add(currentDtr);
+        } else {
+          appTokens.get(applicationId).add(dtr);
+          setTimerForTokenRenewal(dtr);
         }
       }
-      for (DelegationTokenToRenew dtr : tokenList) {
-        appTokens.get(applicationId).add(dtr);
-        allTokens.put(dtr.token, dtr);
-        setTimerForTokenRenewal(dtr);
-      }
     }
 
     if (!hasHdfsToken) {
-      requestNewHdfsDelegationToken(applicationId, evt.getUser(),
+      requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
         shouldCancelAtEnd);
     }
   }
@@ -479,7 +491,7 @@ public class DelegationTokenRenewer extends AbstractService {
       try {
         requestNewHdfsDelegationTokenIfNeeded(dttr);
         // if the token is not replaced by a new token, renew the token
-        if (appTokens.get(dttr.applicationId).contains(dttr)) {
+        if (!dttr.isTimerCancelled()) {
           renewToken(dttr);
           setTimerForTokenRenewal(dttr);// set the next one
         } else {
@@ -509,12 +521,12 @@ public class DelegationTokenRenewer extends AbstractService {
     long expiresIn = token.expirationDate - System.currentTimeMillis();
     long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
     // need to create new task every time
-    TimerTask tTask = new RenewalTimerTask(token);
+    RenewalTimerTask tTask = new RenewalTimerTask(token);
     token.setTimerTask(tTask); // keep reference to the timer
 
     renewalTimer.schedule(token.timerTask, new Date(renewIn));
     LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
-        + token.applicationId);
+        + token.referringAppIds);
   }
 
   // renew a token
@@ -536,7 +548,7 @@ public class DelegationTokenRenewer extends AbstractService {
       throw new IOException(e);
     }
     LOG.info("Renewed delegation-token= [" + dttr + "], for "
-        + dttr.applicationId);
+        + dttr.referringAppIds);
   }
 
   // Request new hdfs token if the token is about to expire, and remove the old
@@ -549,30 +561,37 @@ public class DelegationTokenRenewer extends AbstractService {
         && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
         && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
 
+      final Collection<ApplicationId> applicationIds;
+      synchronized (dttr.referringAppIds) {
+        applicationIds = new HashSet<ApplicationId>(dttr.referringAppIds);
+        dttr.referringAppIds.clear();
+      }
       // remove all old expiring hdfs tokens for this application.
-      Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId);
-      if (tokenSet != null && !tokenSet.isEmpty()) {
+      for (ApplicationId appId : applicationIds) {
+        Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);
+        if (tokenSet == null || tokenSet.isEmpty()) {
+          continue;
+        }
         Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
         synchronized (tokenSet) {
           while (iter.hasNext()) {
             DelegationTokenToRenew t = iter.next();
             if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
               iter.remove();
-              if (t.timerTask != null) {
-                t.timerTask.cancel();
-              }
+              t.cancelTimer();
               LOG.info("Removed expiring token " + t);
             }
           }
         }
       }
       LOG.info("Token= (" + dttr + ") is expiring, request new token.");
-      requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
-        dttr.shouldCancelAtEnd);
+      requestNewHdfsDelegationToken(applicationIds, dttr.user,
+          dttr.shouldCancelAtEnd);
     }
   }
 
-  private void requestNewHdfsDelegationToken(ApplicationId applicationId,
+  private void requestNewHdfsDelegationToken(
+      Collection<ApplicationId> referringAppIds,
       String user, boolean shouldCancelAtEnd) throws IOException,
       InterruptedException {
     if (!hasProxyUserPrivileges) {
@@ -584,18 +603,20 @@ public class DelegationTokenRenewer extends AbstractService {
     Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
 
     // Add new tokens to the toRenew list.
-    LOG.info("Received new tokens for " + applicationId + ". Received "
+    LOG.info("Received new tokens for " + referringAppIds + ". Received "
         + newTokens.length + " tokens.");
     if (newTokens.length > 0) {
       for (Token<?> token : newTokens) {
         if (token.isManaged()) {
           DelegationTokenToRenew tokenToRenew =
-              new DelegationTokenToRenew(applicationId, token, getConfig(),
+              new DelegationTokenToRenew(referringAppIds, token, getConfig(),
                 Time.now(), shouldCancelAtEnd, user);
           // renew the token to get the next expiration date.
           renewToken(tokenToRenew);
           setTimerForTokenRenewal(tokenToRenew);
-          appTokens.get(applicationId).add(tokenToRenew);
+          for (ApplicationId applicationId : referringAppIds) {
+            appTokens.get(applicationId).add(tokenToRenew);
+          }
           LOG.info("Received new token " + token);
         }
       }
@@ -603,7 +624,9 @@ public class DelegationTokenRenewer extends AbstractService {
     DataOutputBuffer dob = new DataOutputBuffer();
     credentials.writeTokenStorageToStream(dob);
     ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+    for (ApplicationId applicationId : referringAppIds) {
+      rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+    }
   }
 
   protected Token<?>[] obtainSystemTokensForUser(String user,
@@ -637,16 +660,18 @@ public class DelegationTokenRenewer extends AbstractService {
    * @param applicationId
    */
   private void removeFailedDelegationToken(DelegationTokenToRenew t) {
-    ApplicationId applicationId = t.applicationId;
-    LOG.error("removing failed delegation token for appid=" + applicationId
-        + ";t=" + t.token.getService());
-    appTokens.get(applicationId).remove(t);
+    Collection<ApplicationId> applicationIds = t.referringAppIds;
+    synchronized (applicationIds) {
+      LOG.error("removing failed delegation token for appid=" + applicationIds
+          + ";t=" + t.token.getService());
+      for (ApplicationId applicationId : applicationIds) {
+        appTokens.get(applicationId).remove(t);
+      }
+    }
     allTokens.remove(t.token);
 
     // cancel the timer
-    if (t.timerTask != null) {
-      t.timerTask.cancel();
-    }
+    t.cancelTimer();
   }
 
   /**
@@ -699,9 +724,15 @@ public class DelegationTokenRenewer extends AbstractService {
                 + "; token=" + dttr.token.getService());
           }
 
+          // continue if the app list isn't empty
+          synchronized(dttr.referringAppIds) {
+            dttr.referringAppIds.remove(applicationId);
+            if (!dttr.referringAppIds.isEmpty()) {
+              continue;
+            }
+          }
           // cancel the timer
-          if (dttr.timerTask != null)
-            dttr.timerTask.cancel();
+          dttr.cancelTimer();
 
           // cancel the token
           cancelToken(dttr);

+ 86 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

@@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenToRenew;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -123,7 +124,7 @@ public class TestDelegationTokenRenewer {
       counter = 0;
       lastRenewed = null;
       tokenToRenewIn2Sec = null;
-
+      cancelled = false;
     }
 
     @Override
@@ -1022,4 +1023,88 @@ public class TestDelegationTokenRenewer {
     // app2 completes, app1 is still running, check the token is not cancelled
     Assert.assertFalse(Renewer.cancelled);
   }
+  
+  // Test submitting an application with the token obtained by a previously
+  // submitted application that is set to be cancelled.  Token should be
+  // renewed while all apps are running, and then cancelled when all apps
+  // complete
+  @Test (timeout = 30000)
+  public void testCancelWithMultipleAppSubmissions() throws Exception{
+    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm.start();
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    //MyFS fs = (MyFS)FileSystem.get(conf);
+    //MyToken token1 = fs.getDelegationToken("user123");
+
+    // create Token1:
+    Text userText1 = new Text("user");
+    DelegationTokenIdentifier dtId1 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+          userText1);
+    final Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+          "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+
+    Credentials credentials = new Credentials();
+    credentials.addToken(token1.getService(), token1);
+
+    DelegationTokenRenewer renewer =
+        rm.getRMContext().getDelegationTokenRenewer();
+    Assert.assertTrue(renewer.getAllTokens().isEmpty());
+    Assert.assertFalse(Renewer.cancelled);
+
+    RMApp app1 =
+        rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+          null, true, false, false, null, 0, null, true);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+    rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    DelegationTokenToRenew dttr = renewer.getAllTokens().get(token1);
+    Assert.assertNotNull(dttr);
+    Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+    RMApp app2 =
+        rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+          null, true, false, false, null, 0, null, true);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+    rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
+    Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+    Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId()));
+    Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId()));
+    Assert.assertFalse(Renewer.cancelled);
+
+    MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
+    // app2 completes, app1 is still running, check the token is not cancelled
+    Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+    Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+    Assert.assertFalse(dttr.referringAppIds.contains(app2.getApplicationId()));
+    Assert.assertFalse(dttr.isTimerCancelled());
+    Assert.assertFalse(Renewer.cancelled);
+
+    RMApp app3 =
+        rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+          null, true, false, false, null, 0, null, true);
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1);
+    rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING);
+    Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+    Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+    Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId()));
+    Assert.assertFalse(dttr.isTimerCancelled());
+    Assert.assertFalse(Renewer.cancelled);
+
+    MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1);
+    Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+    Assert.assertFalse(dttr.referringAppIds.contains(app1.getApplicationId()));
+    Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId()));
+    Assert.assertFalse(dttr.isTimerCancelled());
+    Assert.assertFalse(Renewer.cancelled);
+
+    MockRM.finishAMAndVerifyAppState(app3, rm, nm1, am3);
+    Assert.assertFalse(renewer.getAllTokens().containsKey(token1));
+    Assert.assertTrue(dttr.referringAppIds.isEmpty());
+    Assert.assertTrue(dttr.isTimerCancelled());
+    Assert.assertTrue(Renewer.cancelled);
+  }
 }