Procházet zdrojové kódy

YARN-9768. RM Renew Delegation token thread should timeout and retry. Contributed by Manikandan R.

Inigo Goiri před 5 roky
rodič
revize
0696828a09

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -729,6 +730,19 @@ public class YarnConfiguration extends Configuration {
   public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
       12800;
 
+  public static final String RM_DT_RENEWER_THREAD_TIMEOUT =
+      RM_PREFIX + "delegation-token-renewer.thread-timeout";
+  public static final long DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT =
+      TimeUnit.SECONDS.toMillis(60); // 60 Seconds
+  public static final String RM_DT_RENEWER_THREAD_RETRY_INTERVAL =
+      RM_PREFIX + "delegation-token-renewer.thread-retry-interval";
+  public static final long DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL =
+      TimeUnit.SECONDS.toMillis(60); // 60 Seconds
+  public static final String RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
+      RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts";
+  public static final int DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
+      10;
+
   public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
   public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
 

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -957,6 +957,30 @@
     <value>86400000</value>
   </property>
 
+  <property>
+    <description>
+    RM DelegationTokenRenewer thread timeout
+    </description>
+    <name>yarn.resourcemanager.delegation-token-renewer.thread-timeout</name>
+    <value>60s</value>
+  </property>
+
+  <property>
+    <description>
+    Default maximum number of retries for each RM DelegationTokenRenewer thread
+    </description>
+    <name>yarn.resourcemanager.delegation-token-renewer.thread-retry-max-attempts</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>
+    Time interval between each RM DelegationTokenRenewer thread retry attempt
+    </description>
+    <name>yarn.resourcemanager.delegation-token-renewer.thread-retry-interval</name>
+    <value>60s</value>
+  </property>
+
   <property>
     <description>
     Thread pool size for RMApplicationHistoryWriter.

+ 141 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -36,10 +37,12 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -115,6 +118,12 @@ public class DelegationTokenRenewer extends AbstractService {
   private boolean tokenKeepAliveEnabled;
   private boolean hasProxyUserPrivileges;
   private long credentialsValidTimeRemaining;
+  private long tokenRenewerThreadTimeout;
+  private long tokenRenewerThreadRetryInterval;
+  private int tokenRenewerThreadRetryMaxAttempts;
+  private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
+      new HashMap<>();
+  private boolean delegationTokenRenewerPoolTrackerFlag = true;
 
   // this config is supposedly not used by end-users.
   public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
@@ -140,6 +149,17 @@ public class DelegationTokenRenewer extends AbstractService {
     this.credentialsValidTimeRemaining =
         conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
           DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
+    tokenRenewerThreadTimeout =
+        conf.getTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT,
+            YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT,
+            TimeUnit.MILLISECONDS);
+    tokenRenewerThreadRetryInterval = conf.getTimeDuration(
+        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
+        YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
+        TimeUnit.MILLISECONDS);
+    tokenRenewerThreadRetryMaxAttempts =
+        conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
+            YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
     setLocalSecretManagerAndServiceAddr();
     renewerService = createNewThreadPoolService(conf);
     pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
@@ -184,6 +204,11 @@ public class DelegationTokenRenewer extends AbstractService {
     serviceStateLock.writeLock().lock();
     isServiceStarted = true;
     serviceStateLock.writeLock().unlock();
+
+    if (delegationTokenRenewerPoolTrackerFlag) {
+      renewerService.submit(new DelegationTokenRenewerPoolTracker());
+    }
+
     while(!pendingEventQueue.isEmpty()) {
       processDelegationTokenRenewerEvent(pendingEventQueue.take());
     }
@@ -195,7 +220,9 @@ public class DelegationTokenRenewer extends AbstractService {
     serviceStateLock.readLock().lock();
     try {
       if (isServiceStarted) {
-        renewerService.execute(new DelegationTokenRenewerRunnable(evt));
+        Future<?> future =
+            renewerService.submit(new DelegationTokenRenewerRunnable(evt));
+        futures.put(evt, future);
       } else {
         pendingEventQueue.add(evt);
       }
@@ -476,7 +503,8 @@ public class DelegationTokenRenewer extends AbstractService {
               for (Iterator<Map.Entry<String, String>> itor =
                    tokenConf.iterator(); itor.hasNext(); ) {
                 Map.Entry<String, String> entry = itor.next();
-                LOG.info(entry.getKey() + " ===> " + entry.getValue());
+                LOG.debug("Token conf key is {} and value is {}",
+                    entry.getKey(), entry.getValue());
               }
             }
           }  else {
@@ -894,7 +922,100 @@ public class DelegationTokenRenewer extends AbstractService {
   public void setRMContext(RMContext rmContext) {
     this.rmContext = rmContext;
   }
-  
+
+  @VisibleForTesting
+  public void setDelegationTokenRenewerPoolTracker(boolean flag) {
+    delegationTokenRenewerPoolTrackerFlag = flag;
+  }
+
+  /**
+   * Create a timer task to retry the token renewer event which would be
+   * scheduled at defined intervals based on the configuration.
+   *
+   * @param evt
+   * @return Timer Task
+   */
+  private TimerTask getTimerTask(AbstractDelegationTokenRenewerAppEvent evt) {
+    return new TimerTask() {
+      @Override
+      public void run() {
+        LOG.info("Retrying token renewer thread for appid = {} and "
+            + "attempt is {}", evt.getApplicationId(),
+            evt.getAttempt());
+        evt.incrAttempt();
+
+        Collection<Token<?>> tokens =
+            evt.getCredentials().getAllTokens();
+        for (Token<?> token : tokens) {
+          DelegationTokenToRenew dttr = allTokens.get(token);
+          if (dttr != null) {
+            removeFailedDelegationToken(dttr);
+          }
+        }
+
+        DelegationTokenRenewerAppRecoverEvent event =
+            new DelegationTokenRenewerAppRecoverEvent(
+                evt.getApplicationId(), evt.getCredentials(),
+                evt.shouldCancelAtEnd(), evt.getUser(), evt.getTokenConf());
+        event.setAttempt(evt.getAttempt());
+        processDelegationTokenRenewerEvent(event);
+      }
+    };
+  }
+
+  /**
+   * Runnable class to set timeout for futures of all threads running in
+   * renewerService thread pool executor asynchronously.
+   *
+   * In case of timeout exception, retries would be attempted with defined
+   * intervals till no. of retry attempt reaches max attempt.
+   */
+  private final class DelegationTokenRenewerPoolTracker
+      implements Runnable {
+
+    DelegationTokenRenewerPoolTracker() {
+    }
+
+    /**
+     * Keep traversing <Future> of renewer pool threads and wait for specific
+     * timeout. In case of timeout exception, retry the event till no. of
+     * attempts reaches max attempts with specific interval.
+     */
+    @Override
+    public void run() {
+      while (true) {
+        for (Map.Entry<DelegationTokenRenewerEvent, Future<?>> entry : futures
+            .entrySet()) {
+          DelegationTokenRenewerEvent evt = entry.getKey();
+          Future<?> future = entry.getValue();
+          try {
+            future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
+          } catch (TimeoutException e) {
+
+            // Cancel thread and retry the same event in case of timeout
+            if (future != null && !future.isDone() && !future.isCancelled()) {
+              future.cancel(true);
+              futures.remove(evt);
+              if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
+                renewalTimer.schedule(
+                    getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
+                    tokenRenewerThreadRetryInterval);
+              } else {
+                LOG.info(
+                    "Exhausted max retry attempts {} in token renewer "
+                        + "thread for {}",
+                    tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId());
+              }
+            }
+          } catch (Exception e) {
+            LOG.info("Problem in submitting renew tasks in token renewer "
+                + "thread.", e);
+          }
+        }
+      }
+    }
+  }
+
   /*
    * This will run as a separate thread and will process individual events. It
    * is done in this way to make sure that the token renewal as a part of
@@ -1016,6 +1137,10 @@ public class DelegationTokenRenewer extends AbstractService {
     public String getUser() {
       return user;
     }
+
+    private Configuration getTokenConf() {
+      return tokenConf;
+    }
   }
   
   enum DelegationTokenRenewerEventType {
@@ -1028,6 +1153,7 @@ public class DelegationTokenRenewer extends AbstractService {
       AbstractEvent<DelegationTokenRenewerEventType> {
 
     private ApplicationId appId;
+    private int attempt = 1;
 
     public DelegationTokenRenewerEvent(ApplicationId appId,
         DelegationTokenRenewerEventType type) {
@@ -1038,6 +1164,18 @@ public class DelegationTokenRenewer extends AbstractService {
     public ApplicationId getApplicationId() {
       return appId;
     }
+
+    public void incrAttempt() {
+      attempt++;
+    }
+
+    public int getAttempt() {
+      return attempt;
+    }
+
+    public void setAttempt(int attempt) {
+      this.attempt = attempt;
+    }
   }
 
   // only for testing

+ 176 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -42,6 +43,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -93,6 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -230,6 +233,7 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+    delegationTokenRenewer.setDelegationTokenRenewerPoolTracker(false);
     delegationTokenRenewer.setRMContext(mockContext);
     delegationTokenRenewer.init(conf);
     delegationTokenRenewer.start();
@@ -632,6 +636,7 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+    localDtr.setDelegationTokenRenewerPoolTracker(false);
     localDtr.setRMContext(mockContext);
     localDtr.init(lconf);
     localDtr.start();
@@ -712,6 +717,7 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+    localDtr.setDelegationTokenRenewerPoolTracker(false);
     localDtr.setRMContext(mockContext);
     localDtr.init(lconf);
     localDtr.start();
@@ -1612,4 +1618,173 @@ public class TestDelegationTokenRenewer {
     // Ensure incrTokenSequenceNo has been called for token renewal as well.
     Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo();
   }
-}
+
+  /**
+   * Test case to ensure token renewer threads are timed out by inducing
+   * artificial delay.
+   *
+   * Because of time out, retries would be attempted till it reaches max retry
+   * attempt and finally asserted using used threads count.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 30000)
+  public void testTokenThreadTimeout() throws Exception {
+    Configuration yarnConf = new YarnConfiguration();
+    yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
+        true);
+    yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class,
+        RMStateStore.class);
+    yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5,
+        TimeUnit.SECONDS);
+    yarnConf.setTimeDuration(
+        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5,
+        TimeUnit.SECONDS);
+    yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
+        3);
+    UserGroupInformation.setConfiguration(yarnConf);
+
+    Text userText = new Text("user1");
+    DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText,
+        new Text("renewer1"), userText);
+    final Token<DelegationTokenIdentifier> originalToken =
+        new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(),
+            new Text("service1"));
+
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText, originalToken);
+
+    AtomicBoolean renewDelay = new AtomicBoolean(false);
+
+    // -1 is because of thread allocated to pool tracker runnable tasks
+    AtomicInteger threadCounter = new AtomicInteger(-1);
+    renewDelay.set(true);
+    DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout(
+        yarnConf, threadCounter, renewDelay);
+
+    MockRM rm = new TestSecurityMockRM(yarnConf) {
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return renewer;
+      }
+    };
+
+    rm.start();
+    rm.submitApp(200, "name", "user",
+        new HashMap<ApplicationAccessType, String>(), false, "default", 1,
+        credentials);
+
+    int attempts = yarnConf.getInt(
+        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
+
+    GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 2000,
+        30000);
+
+    // Ensure no. of threads has been used in renewer service thread pool is
+    // higher than the configured max retry attempts
+    assertTrue(threadCounter.get() >= attempts);
+    rm.close();
+  }
+
+  /**
+   * Test case to ensure token renewer threads are running as usual and finally
+   * asserted only 1 thread has been used.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 30000)
+  public void testTokenThreadTimeoutWithoutDelay() throws Exception {
+    Configuration yarnConf = new YarnConfiguration();
+    yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
+        true);
+    yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    yarnConf.set(YarnConfiguration.RM_STORE,
+        MemoryRMStateStore.class.getName());
+    yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5,
+        TimeUnit.SECONDS);
+    yarnConf.setTimeDuration(
+        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5,
+        TimeUnit.SECONDS);
+    yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
+        3);
+    UserGroupInformation.setConfiguration(yarnConf);
+
+    Text userText = new Text("user1");
+    DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText,
+        new Text("renewer1"), userText);
+    final Token<DelegationTokenIdentifier> originalToken =
+        new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(),
+            new Text("service1"));
+
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText, originalToken);
+
+    AtomicBoolean renewDelay = new AtomicBoolean(false);
+
+    // -1 is because of thread allocated to pool tracker runnable tasks
+    AtomicInteger threadCounter = new AtomicInteger(-1);
+    DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout(
+        yarnConf, threadCounter, renewDelay);
+
+    MockRM rm = new TestSecurityMockRM(yarnConf) {
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return renwer;
+      }
+    };
+
+    rm.start();
+    rm.submitApp(200, "name", "user",
+        new HashMap<ApplicationAccessType, String>(), false, "default", 1,
+        credentials);
+
+    GenericTestUtils.waitFor(() -> threadCounter.get() == 1, 2000, 40000);
+
+    // Ensure only one thread has been used in renewer service thread pool.
+    assertEquals(threadCounter.get(), 1);
+    rm.close();
+  }
+
+  private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout(
+      Configuration config, final AtomicInteger renewerCounter,
+      final AtomicBoolean renewDelay) {
+    DelegationTokenRenewer renew = new DelegationTokenRenewer() {
+      @Override
+      protected ThreadPoolExecutor createNewThreadPoolService(
+          Configuration configuration) {
+        ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 3L,
+            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
+          @Override
+          public Future<?> submit(Runnable r) {
+            renewerCounter.incrementAndGet();
+            return super.submit(r);
+          }
+        };
+        return pool;
+      }
+
+      @Override
+      protected void renewToken(final DelegationTokenToRenew dttr)
+          throws IOException {
+        try {
+          if (renewDelay.get()) {
+            // Delay for 4 times than the configured timeout
+            Thread.sleep(config.getTimeDuration(
+                YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT,
+                YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT,
+                TimeUnit.MILLISECONDS) * 4);
+          }
+          super.renewToken(dttr);
+        } catch (InterruptedException e) {
+          LOG.info("Sleep Interrupted", e);
+        }
+      }
+    };
+    renew.setDelegationTokenRenewerPoolTracker(true);
+    return renew;
+  }
+}