Browse Source

YARN-10500. TestDelegationTokenRenewer fails intermittently. (#2619)

(cherry picked from commit f9a073c6c186848e09e2ee04118fd996ea8ace59)
Masatake Iwasaki 4 years ago
parent
commit
4468378e4b

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -26,7 +26,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -123,7 +122,7 @@ public class DelegationTokenRenewer extends AbstractService {
   private long tokenRenewerThreadRetryInterval;
   private int tokenRenewerThreadRetryMaxAttempts;
   private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
-      new HashMap<>();
+      new ConcurrentHashMap<>();
   private boolean delegationTokenRenewerPoolTrackerFlag = true;
 
   // this config is supposedly not used by end-users.

+ 82 - 61
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

@@ -196,6 +196,10 @@ public class TestDelegationTokenRenewer {
 
   private static Configuration conf;
   DelegationTokenRenewer delegationTokenRenewer;
+  private MockRM rm;
+  private MockRM rm1;
+  private MockRM rm2;
+  private DelegationTokenRenewer localDtr;
  
   @BeforeClass
   public static void setUpClass() throws Exception {
@@ -243,13 +247,30 @@ public class TestDelegationTokenRenewer {
   }
   
   @After
-  public void tearDown() {
+  public void tearDown() throws Exception {
     try {
       dispatcher.close();
     } catch (IOException e) {
       LOG.debug("Unable to close the dispatcher. " + e);
     }
     delegationTokenRenewer.stop();
+
+    if (rm != null) {
+      rm.close();
+      rm = null;
+    }
+    if (rm1 != null) {
+      rm1.close();
+      rm1 = null;
+    }
+    if (rm2 != null) {
+      rm2.close();
+      rm2 = null;
+    }
+    if (localDtr != null) {
+      localDtr.close();
+      localDtr = null;
+    }
   }
   
   private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager {
@@ -371,9 +392,9 @@ public class TestDelegationTokenRenewer {
     return token1;
   }
 
-  private RMApp submitApp(MockRM rm, Credentials cred, ByteBuffer tokensConf)
-      throws Exception {
-    int maxAttempts = rm.getConfig().getInt(
+  private RMApp submitApp(MockRM mockrm,
+      Credentials cred, ByteBuffer tokensConf) throws Exception {
+    int maxAttempts = mockrm.getConfig().getInt(
         YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
     MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.create()
@@ -397,7 +418,7 @@ public class TestDelegationTokenRenewer {
         .withApplicationTimeouts(null)
         .withTokensConf(tokensConf)
         .build();
-    return MockRMAppSubmitter.submit(rm, data);
+    return MockRMAppSubmitter.submit(mockrm, data);
   }
   
   
@@ -626,8 +647,7 @@ public class TestDelegationTokenRenewer {
     lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
         true);
 
-    DelegationTokenRenewer localDtr =
-        createNewDelegationTokenRenewer(lconf, counter);
+    localDtr = createNewDelegationTokenRenewer(lconf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
         new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
@@ -702,8 +722,7 @@ public class TestDelegationTokenRenewer {
     lconf.setLong(
         YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
         1000l);
-    DelegationTokenRenewer localDtr =
-        createNewDelegationTokenRenewer(lconf, counter);
+    localDtr = createNewDelegationTokenRenewer(lconf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
         new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
@@ -783,8 +802,7 @@ public class TestDelegationTokenRenewer {
     lconf.setLong(
         YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
         1000l);
-    DelegationTokenRenewer localDtr =
-        createNewDelegationTokenRenewer(conf, counter);
+    localDtr = createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
         new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
@@ -889,8 +907,7 @@ public class TestDelegationTokenRenewer {
     doThrow(new IOException("boom"))
         .when(tokenx).renew(any(Configuration.class));
       // fire up the renewer
-    final DelegationTokenRenewer dtr =
-         createNewDelegationTokenRenewer(conf, counter);
+    localDtr = createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
         new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
@@ -900,13 +917,14 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
-    dtr.setRMContext(mockContext);
-    when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
-    dtr.init(conf);
-    dtr.start();
+    localDtr.setRMContext(mockContext);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
+    localDtr.init(conf);
+    localDtr.start();
 
     try {
-      dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
+      localDtr.addApplicationSync(mock(ApplicationId.class),
+          credsx, false, "user");
       fail("Catch IOException on app submission");
     } catch (IOException e){
       Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
@@ -949,8 +967,8 @@ public class TestDelegationTokenRenewer {
     doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));     
                                                                                
     // fire up the renewer                                                     
-    final DelegationTokenRenewer dtr =
-        createNewDelegationTokenRenewer(conf, counter);           
+    localDtr = createNewDelegationTokenRenewer(conf, counter);
+
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
         new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
@@ -960,24 +978,24 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =                                               
         InetSocketAddress.createUnresolved("localhost", 1234);                 
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);           
-    dtr.setRMContext(mockContext);  
-    when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
-    dtr.init(conf);
-    dtr.start();                                                                           
+    localDtr.setRMContext(mockContext);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
+    localDtr.init(conf);
+    localDtr.start();
     // submit a job that blocks during renewal                                 
     Thread submitThread = new Thread() {                                       
       @Override                                                                
       public void run() {
-        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user",
-            new Configuration());
+        localDtr.addApplicationAsync(mock(ApplicationId.class),
+            creds1, false, "user", new Configuration());
       }                                                                        
     };                                                                         
     submitThread.start();                                                      
                                                                                
     // wait till 1st submit blocks, then submit another
     startBarrier.await();                           
-    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user",
-        new Configuration());
+    localDtr.addApplicationAsync(mock(ApplicationId.class),
+        creds2, false, "user", new Configuration());
     // signal 1st to complete                                                  
     endBarrier.await();                                                        
     submitThread.join(); 
@@ -990,7 +1008,7 @@ public class TestDelegationTokenRenewer {
         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
     UserGroupInformation.setConfiguration(conf);
-    MockRM rm = new MockRM(conf) {
+    rm = new MockRM(conf) {
       @Override
       protected void doSecureLogin() throws IOException {
         // Skip the login.
@@ -1046,7 +1064,7 @@ public class TestDelegationTokenRenewer {
         new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
           "password2".getBytes(), dtId2.getKind(), new Text("service2"));
 
-    final MockRM rm = new TestSecurityMockRM(conf, null) {
+    rm = new TestSecurityMockRM(conf, null) {
       @Override
       protected DelegationTokenRenewer createDelegationTokenRenewer() {
         return new DelegationTokenRenewer() {
@@ -1149,7 +1167,7 @@ public class TestDelegationTokenRenewer {
     Credentials credentials = new Credentials();
     credentials.addToken(userText1, originalToken);
 
-    MockRM rm1 = new TestSecurityMockRM(yarnConf);
+    rm1 = new TestSecurityMockRM(yarnConf);
     MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
     rm1.start();
     RMApp app = MockRMAppSubmitter.submit(rm1,
@@ -1173,7 +1191,7 @@ public class TestDelegationTokenRenewer {
             "password2".getBytes(), dtId2.getKind(), new Text("service2"));
     AtomicBoolean firstRenewInvoked = new AtomicBoolean(false);
     AtomicBoolean secondRenewInvoked = new AtomicBoolean(false);
-    MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) {
+    rm2 = new TestSecurityMockRM(yarnConf, memStore) {
       @Override
       protected DelegationTokenRenewer createDelegationTokenRenewer() {
         return new DelegationTokenRenewer() {
@@ -1183,8 +1201,8 @@ public class TestDelegationTokenRenewer {
               throws IOException {
 
             if (dttr.token.equals(updatedToken)) {
-              secondRenewInvoked.set(true);
               super.renewToken(dttr);
+              secondRenewInvoked.set(true);
             } else if (dttr.token.equals(originalToken)){
               firstRenewInvoked.set(true);
               throw new InvalidToken("Failed to renew");
@@ -1210,6 +1228,9 @@ public class TestDelegationTokenRenewer {
     final MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
     nm1.registerNode();
+
+    GenericTestUtils.waitFor(() -> secondRenewInvoked.get(), 100, 10000);
+
     NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
 
     NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl(
@@ -1241,7 +1262,7 @@ public class TestDelegationTokenRenewer {
     final Token<DelegationTokenIdentifier> token2 =
         new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
           "password2".getBytes(), dtId2.getKind(), new Text("service2"));
-    final MockRM rm = new TestSecurityMockRM(conf, null) {
+    rm = new TestSecurityMockRM(conf, null) {
       @Override
       protected DelegationTokenRenewer createDelegationTokenRenewer() {
         return new DelegationTokenRenewer() {
@@ -1293,7 +1314,7 @@ public class TestDelegationTokenRenewer {
   // submitted application.
   @Test (timeout = 30000)
   public void testAppSubmissionWithPreviousToken() throws Exception{
-    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm = new TestSecurityMockRM(conf, null);
     rm.start();
     final MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
@@ -1369,7 +1390,7 @@ public class TestDelegationTokenRenewer {
   // complete
   @Test (timeout = 30000)
   public void testCancelWithMultipleAppSubmissions() throws Exception{
-    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm = new TestSecurityMockRM(conf, null);
     rm.start();
     final MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
@@ -1484,10 +1505,10 @@ public class TestDelegationTokenRenewer {
     Assert.assertFalse(renewer.getDelegationTokens().contains(token1));
   }
 
-  private void finishAMAndWaitForComplete(final RMApp app, MockRM rm,
-      MockNM nm, MockAM am, final DelegationTokenToRenew dttr)
+  private void finishAMAndWaitForComplete(final RMApp app, MockRM mockrm,
+      MockNM mocknm, MockAM mockam, final DelegationTokenToRenew dttr)
           throws Exception {
-    MockRM.finishAMAndVerifyAppState(app, rm, nm, am);
+    MockRM.finishAMAndVerifyAppState(app, mockrm, mocknm, mockam);
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       public Boolean get() {
         return !dttr.referringAppIds.contains(app.getApplicationId());
@@ -1503,7 +1524,7 @@ public class TestDelegationTokenRenewer {
         "kerberos");
     UserGroupInformation.setConfiguration(conf);
 
-    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm = new TestSecurityMockRM(conf, null);
     rm.start();
     final MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
@@ -1558,7 +1579,7 @@ public class TestDelegationTokenRenewer {
     UserGroupInformation.setConfiguration(conf);
     // limit 100 bytes
     conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, 100);
-    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm = new TestSecurityMockRM(conf, null);
     rm.start();
     final MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
@@ -1621,7 +1642,7 @@ public class TestDelegationTokenRenewer {
    */
   @Test
   public void testShutDown() {
-    DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter);
+    localDtr = createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
         new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
@@ -1631,10 +1652,10 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
-    dtr.setRMContext(mockContext);
-    when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
-    dtr.init(conf);
-    dtr.start();
+    localDtr.setRMContext(mockContext);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
+    localDtr.init(conf);
+    localDtr.start();
     delegationTokenRenewer.stop();
     delegationTokenRenewer.applicationFinished(
         BuilderUtils.newApplicationId(0, 1));
@@ -1656,7 +1677,7 @@ public class TestDelegationTokenRenewer {
             "password2".getBytes(), dtId1.getKind(), new Text("service2"));
 
     // fire up the renewer
-    final DelegationTokenRenewer dtr = new DelegationTokenRenewer() {
+    localDtr = new DelegationTokenRenewer() {
       @Override
       protected Token<?>[] obtainSystemTokensForUser(String user,
           final Credentials credentials) throws IOException {
@@ -1674,25 +1695,25 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
-    dtr.setRMContext(mockContext);
-    when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
-    dtr.init(conf);
-    dtr.start();
+    localDtr.setRMContext(mockContext);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
+    localDtr.init(conf);
+    localDtr.start();
 
     final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
 
     Collection<ApplicationId> appIds = new ArrayList<ApplicationId>(1);
     appIds.add(appId1);
 
-    dtr.addApplicationSync(appId1, credsx, false, "user1");
+    localDtr.addApplicationSync(appId1, credsx, false, "user1");
 
     // Ensure incrTokenSequenceNo has been called for new token request
     Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo();
 
-    DelegationTokenToRenew dttr = dtr.new DelegationTokenToRenew(appIds,
+    DelegationTokenToRenew dttr = localDtr.new DelegationTokenToRenew(appIds,
         expectedToken, conf, 1000, false, "user1");
 
-    dtr.requestNewHdfsDelegationTokenIfNeeded(dttr);
+    localDtr.requestNewHdfsDelegationTokenIfNeeded(dttr);
 
     // Ensure incrTokenSequenceNo has been called for token renewal as well.
     Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo();
@@ -1710,16 +1731,17 @@ public class TestDelegationTokenRenewer {
   @Test(timeout = 30000)
   public void testTokenThreadTimeout() throws Exception {
     Configuration yarnConf = new YarnConfiguration();
+    yarnConf.set("override_token_expire_time", "30000");
     yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
         true);
     yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
     yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class,
         RMStateStore.class);
-    yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5,
+    yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 2,
         TimeUnit.SECONDS);
     yarnConf.setTimeDuration(
-        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5,
+        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 0,
         TimeUnit.SECONDS);
     yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
         3);
@@ -1743,7 +1765,7 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout(
         yarnConf, threadCounter, renewDelay);
 
-    MockRM rm = new TestSecurityMockRM(yarnConf) {
+    rm = new TestSecurityMockRM(yarnConf) {
       @Override
       protected DelegationTokenRenewer createDelegationTokenRenewer() {
         return renewer;
@@ -1766,8 +1788,7 @@ public class TestDelegationTokenRenewer {
         YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
 
-    GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 2000,
-        30000);
+    GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 100, 20000);
 
     // Ensure no. of threads has been used in renewer service thread pool is
     // higher than the configured max retry attempts
@@ -1816,7 +1837,7 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout(
         yarnConf, threadCounter, renewDelay);
 
-    MockRM rm = new TestSecurityMockRM(yarnConf) {
+    rm = new TestSecurityMockRM(yarnConf) {
       @Override
       protected DelegationTokenRenewer createDelegationTokenRenewer() {
         return renwer;
@@ -1880,4 +1901,4 @@ public class TestDelegationTokenRenewer {
     renew.setDelegationTokenRenewerPoolTracker(true);
     return renew;
   }
-}
+}