Преглед на файлове

YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or fail-over. Contributed by Jian He.
svn merge --ignore-ancestry -c 1581448 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1581454 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli преди 11 години
родител
ревизия
405867b6f6

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -520,6 +520,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1852. Fixed RMAppAttempt to not resend AttemptFailed/AttemptKilled
     events to already recovered Failed/Killed RMApps. (Rohith via jianhe)
 
+    YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or
+    fail-over. (Jian He via vinodkv)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 11 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -112,6 +112,8 @@ public class DelegationTokenRenewer extends AbstractService {
     this.tokenRemovalDelayMs =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+
+    setLocalSecretManagerAndServiceAddr();
     renewerService = createNewThreadPoolService(conf);
     pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
     renewalTimer = new Timer(true);
@@ -134,6 +136,13 @@ public class DelegationTokenRenewer extends AbstractService {
     return pool;
   }
 
+  // enable RM to short-circuit token operations directly to itself
+  private void setLocalSecretManagerAndServiceAddr() {
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(rmContext
+      .getRMDelegationTokenSecretManager(), rmContext.getClientRMService()
+      .getBindAddress());
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     dtCancelThread.start();
@@ -143,10 +152,8 @@ public class DelegationTokenRenewer extends AbstractService {
               "DelayedTokenCanceller");
       delayedRemovalThread.start();
     }
-    // enable RM to short-circuit token operations directly to itself
-    RMDelegationTokenIdentifier.Renewer.setSecretManager(
-        rmContext.getRMDelegationTokenSecretManager(),
-        rmContext.getClientRMService().getBindAddress());
+
+    setLocalSecretManagerAndServiceAddr();
     serviceStateLock.writeLock().lock();
     isServiceStarted = true;
     serviceStateLock.writeLock().unlock();

+ 24 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -394,7 +394,7 @@ public class TestRMRestart {
     Assert.assertEquals(4, rmAppState.size());
  }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartAppRunningAMFailed() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -440,7 +440,7 @@ public class TestRMRestart {
     rm2.stop();
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
     // testing 3 cases
     // After RM restarts
@@ -607,7 +607,7 @@ public class TestRMRestart {
   // store but before the RMAppAttempt notifies RMApp that it has succeeded. On
   // recovery, RMAppAttempt should send the AttemptFinished event to RMApp so
   // that RMApp can recover its state.
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
     MemoryRMStateStore memStore = new MemoryRMStateStore() {
@@ -660,7 +660,7 @@ public class TestRMRestart {
       rmAppState.get(app0.getApplicationId()).getState());
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartFailedApp() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
     MemoryRMStateStore memStore = new MemoryRMStateStore();
@@ -709,7 +709,7 @@ public class TestRMRestart {
     rm2.stop();
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartKilledApp() throws Exception{
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -757,7 +757,7 @@ public class TestRMRestart {
     rm2.stop();
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartKilledAppWithNoAttempts() throws Exception {
     MemoryRMStateStore memStore = new MemoryRMStateStore() {
       @Override
@@ -797,7 +797,7 @@ public class TestRMRestart {
     Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartSucceededApp() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -849,7 +849,7 @@ public class TestRMRestart {
     rm2.stop();
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartGetApplicationList() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
     MemoryRMStateStore memStore = new MemoryRMStateStore();
@@ -997,7 +997,7 @@ public class TestRMRestart {
       appState.getAttempt(am.getApplicationAttemptId()).getState());
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMRestartOnMaxAppAttempts() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -1071,7 +1071,7 @@ public class TestRMRestart {
     rm2.stop();
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testDelegationTokenRestoredInDelegationTokenRenewer()
       throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -1171,7 +1171,7 @@ public class TestRMRestart {
     }
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
@@ -1261,7 +1261,7 @@ public class TestRMRestart {
     rm2.stop();
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
     conf.set(
@@ -1414,7 +1414,7 @@ public class TestRMRestart {
 
   // This is to test submit an application to the new RM with the old delegation
   // token got from previous RM.
-  @Test
+  @Test (timeout = 60000)
   public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
       throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -1449,7 +1449,7 @@ public class TestRMRestart {
     rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
     MemoryRMStateStore memStore = new MemoryRMStateStore() {
       volatile boolean wait = true;
@@ -1508,7 +1508,7 @@ public class TestRMRestart {
     Assert.assertTrue(rmAppState.size() == NUM_APPS);
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testFinishedAppRemovalAfterRMRestart() throws Exception {
     MemoryRMStateStore memStore = new MemoryRMStateStore();
     conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
@@ -1580,7 +1580,7 @@ public class TestRMRestart {
   // This is to test Killing application should be able to wait until app
   // reaches killed state and also check that attempt state is saved before app
   // state is saved.
-  @Test
+  @Test (timeout = 60000)
   public void testClientRetryOnKillingApplication() throws Exception {
     MemoryRMStateStore memStore = new TestMemoryRMStateStore();
     memStore.init(conf);
@@ -1738,7 +1738,7 @@ public class TestRMRestart {
         appsCompleted + appsCompletedCarryOn);
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
@@ -1875,6 +1875,13 @@ public class TestRMRestart {
       super(conf, store);
     }
 
+    @Override
+    public void init(Configuration conf) {
+      // reset localServiceAddress.
+      RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
+      super.init(conf);
+    }
+
     @Override
     protected ClientRMService createClientRMService() {
       return new ClientRMService(getRMContext(), getResourceScheduler(),

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

@@ -43,7 +43,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -82,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -179,7 +179,6 @@ public class TestDelegationTokenRenewer {
     dispatcher = new AsyncDispatcher(eventQueue);
     Renewer.reset();
     delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
-    delegationTokenRenewer.init(conf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -190,6 +189,7 @@ public class TestDelegationTokenRenewer {
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
     delegationTokenRenewer.setRMContext(mockContext);
+    delegationTokenRenewer.init(conf);
     delegationTokenRenewer.start();
   }
   
@@ -515,7 +515,6 @@ public class TestDelegationTokenRenewer {
         1000l);
     DelegationTokenRenewer localDtr =
         createNewDelegationTokenRenewer(lconf, counter);
-    localDtr.init(lconf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
@@ -526,6 +525,7 @@ public class TestDelegationTokenRenewer {
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
     localDtr.setRMContext(mockContext);
+    localDtr.init(lconf);
     localDtr.start();
     
     MyFS dfs = (MyFS)FileSystem.get(lconf);
@@ -592,7 +592,6 @@ public class TestDelegationTokenRenewer {
         1000l);
     DelegationTokenRenewer localDtr =
         createNewDelegationTokenRenewer(conf, counter);
-    localDtr.init(lconf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
@@ -603,6 +602,7 @@ public class TestDelegationTokenRenewer {
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
     localDtr.setRMContext(mockContext);
+    localDtr.init(lconf);
     localDtr.start();
     
     MyFS dfs = (MyFS)FileSystem.get(lconf);
@@ -704,7 +704,6 @@ public class TestDelegationTokenRenewer {
     // fire up the renewer                                                     
     final DelegationTokenRenewer dtr =
         createNewDelegationTokenRenewer(conf, counter);           
-    dtr.init(conf);                                                            
     RMContext mockContext = mock(RMContext.class);                             
     ClientRMService mockClientRMService = mock(ClientRMService.class);         
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);    
@@ -713,6 +712,7 @@ public class TestDelegationTokenRenewer {
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);           
     dtr.setRMContext(mockContext);  
     when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
+    dtr.init(conf);
     dtr.start();                                                                           
     // submit a job that blocks during renewal                                 
     Thread submitThread = new Thread() {                                       

+ 11 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java

@@ -19,7 +19,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.junit.Test;
 
 /**
@@ -32,9 +37,13 @@ public class TestDelegationTokenRenewerLifecycle {
   @Test
   public void testStartupFailure() throws Exception {
     Configuration conf = new Configuration();
-    DelegationTokenRenewer delegationTokenRenewer = new DelegationTokenRenewer();
+    DelegationTokenRenewer delegationTokenRenewer =
+        new DelegationTokenRenewer();
+    RMContext mockContext = mock(RMContext.class);
+    ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+    delegationTokenRenewer.setRMContext(mockContext);
     delegationTokenRenewer.init(conf);
     delegationTokenRenewer.stop();
   }
-
 }