Explorar o código

YARN-11588. [Federation] [Addendum] Fix uncleaned threads in yarn router thread pool executor. (#6222)

slfan1989 hai 1 ano
pai
achega
652908519e

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4369,6 +4369,22 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME =
       TimeUnit.SECONDS.toMillis(0); // 0s
 
+  /**
+   * This method configures the policy for core threads regarding termination
+   * when no tasks arrive within the keep-alive time.
+   * When set to false, core threads are never terminated due to a lack of tasks.
+   * When set to true, the same keep-alive policy
+   * that applies to non-core threads also applies to core threads.
+   * To prevent constant thread replacement,
+   * ensure that the keep-alive time is greater than zero when setting it to true.
+   * It's advisable to call this method before the pool becomes actively used.
+   */
+  public static final String ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT =
+      ROUTER_PREFIX + "interceptor.user-thread-pool.allow-core-thread-time-out";
+
+  public static final boolean DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT =
+      false;
+
   /** The address of the Router web application. */
   public static final String ROUTER_WEBAPP_ADDRESS =
       ROUTER_WEBAPP_PREFIX + "address";

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5139,6 +5139,23 @@
     </description>
   </property>
 
+  <property>
+    <name>yarn.router.interceptor.user-thread-pool.allow-core-thread-time-out</name>
+    <value>false</value>
+    <description>
+      This method configures the policy for core threads regarding termination
+      when no tasks arrive within the keep-alive time.
+      When set to false, core threads are never terminated due to a lack of tasks.
+      When set to true, the same keep-alive policy
+      that applies to non-core threads also applies to core threads.
+      To prevent constant thread replacement,
+      ensure that the keep-alive time is greater than zero when setting it to true.
+      It's advisable to call this method before the pool becomes actively used.
+      We need to ensure that
+        yarn.router.interceptor.user-thread-pool.keep-alive-time is greater than 0.
+    </description>
+  </property>
+
   <property>
     <name>yarn.router.submit.interval.time</name>
     <value>10ms</value>

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

@@ -231,7 +231,13 @@ public class FederationClientInterceptor
         keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);
 
     // Adding this line so that unused user threads will exit and be cleaned up if idle for too long
-    this.executorService.allowCoreThreadTimeOut(true);
+    boolean allowCoreThreadTimeOut =  getConf().getBoolean(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
+
+    if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
+      this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+    }
 
     final Configuration conf = this.getConf();
 

+ 13 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java

@@ -130,9 +130,21 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
         .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build();
 
+    long keepAliveTime = getConf().getTimeDuration(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME,
+        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS);
+
     BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
     this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
-        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+        keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+
+    boolean allowCoreThreadTimeOut =  getConf().getBoolean(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
+
+    if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
+      this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+    }
 
     federationFacade = FederationStateStoreFacade.getInstance(this.getConf());
     this.conf = this.getConf();