Prechádzať zdrojové kódy

YARN-11332. [Federation] Improve FederationClientInterceptor#ThreadPool thread pool configuration. (#4982)

slfan1989 2 rokov pred
rodič
commit
070a2d4880

+ 76 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4136,11 +4136,87 @@ public class YarnConfiguration extends Configuration {
 
   public static final String ROUTER_WEBAPP_PREFIX = ROUTER_PREFIX + "webapp.";
 
+  /**
+   * This configurable that controls the thread pool size of the threadpool of the interceptor.
+   * The corePoolSize(minimumPoolSize) and maximumPoolSize of the thread pool
+   * are controlled by this configurable.
+   * In order to control the thread pool more accurately, this parameter is deprecated.
+   *
+   * corePoolSize(minimumPoolSize) use
+   * {@link YarnConfiguration#ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE}
+   *
+   * maximumPoolSize use
+   * {@link YarnConfiguration#ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE}
+   *
+   * This configurable will be deprecated.
+   */
   public static final String ROUTER_USER_CLIENT_THREADS_SIZE =
       ROUTER_PREFIX + "interceptor.user.threadpool-size";
 
+  /**
+   * The default value is 5.
+   * which means that the corePoolSize(minimumPoolSize) and maximumPoolSize
+   * of the thread pool are both 5s.
+   *
+   * corePoolSize(minimumPoolSize) default value use
+   * {@link YarnConfiguration#DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE}
+   *
+   * maximumPoolSize default value use
+   * {@link YarnConfiguration#DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE}
+   */
   public static final int DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE = 5;
 
+  /**
+   * This configurable is used to set the corePoolSize(minimumPoolSize)
+   * of the thread pool of the interceptor.
+   *
+   * corePoolSize the number of threads to keep in the pool, even if they are idle.
+   */
+  public static final String ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE =
+      ROUTER_PREFIX + "interceptor.user-thread-pool.minimum-pool-size";
+
+  /**
+   * This configuration is used to set the default value of corePoolSize (minimumPoolSize)
+   * of the thread pool of the interceptor.
+   *
+   * Default is 5.
+   */
+  public static final int DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE = 5;
+
+  /**
+   * This configurable is used to set the maximumPoolSize of the thread pool of the interceptor.
+   *
+   * maximumPoolSize the maximum number of threads to allow in the pool.
+   */
+  public static final String ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE =
+      ROUTER_PREFIX + "interceptor.user-thread-pool.maximum-pool-size";
+
+  /**
+   * This configuration is used to set the default value of maximumPoolSize
+   * of the thread pool of the interceptor.
+   *
+   * Default is 5.
+   */
+  public static final int DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE = 5;
+
+  /**
+   * This configurable is used to set the keepAliveTime of the thread pool of the interceptor.
+   *
+   * keepAliveTime when the number of threads is greater than the core,
+   * this is the maximum time that excess idle threads will wait for new tasks before terminating.
+   */
+  public static final String ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME =
+      ROUTER_PREFIX + "interceptor.user-thread-pool.keep-alive-time";
+
+  /**
+   * This configurable is used to set the default time of keepAliveTime
+   * of the thread pool of the interceptor.
+   *
+   * the default value is 0s.
+   */
+  public static final long DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME =
+      TimeUnit.SECONDS.toMillis(0); // 0s
+
   /** The address of the Router web application. */
   public static final String ROUTER_WEBAPP_ADDRESS =
       ROUTER_WEBAPP_PREFIX + "address";

+ 29 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5047,6 +5047,35 @@
     </description>
   </property>
 
+  <property>
+    <name>yarn.router.interceptor.user-thread-pool.minimum-pool-size</name>
+    <value>5</value>
+    <description>
+      This configurable is used to set the corePoolSize(minimumPoolSize)
+      of the thread pool of the interceptor.
+      Default is 5.
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.router.interceptor.user-thread-pool.maximum-pool-size</name>
+    <value>5</value>
+    <description>
+      This configuration is used to set the default value of maximumPoolSize
+      of the thread pool of the interceptor.
+      Default is 5.
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.router.interceptor.user-thread-pool.keep-alive-time</name>
+    <value>0s</value>
+    <description>
+      This configurable is used to set the keepAliveTime of the thread pool of the interceptor.
+      Default is 0s.
+    </description>
+  </property>
+
   <property>
     <name>yarn.router.submit.interval.time</name>
     <value>10ms</value>

+ 51 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

@@ -187,15 +187,20 @@ public class FederationClientInterceptor
     federationFacade = FederationStateStoreFacade.getInstance();
     rand = new Random(System.currentTimeMillis());
 
-    int numThreads = getConf().getInt(
-        YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
-        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
+    int numMinThreads = getNumMinThreads(getConf());
+
+    int numMaxThreads = getNumMaxThreads(getConf());
+
+    long keepAliveTime = getConf().getTimeDuration(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME,
+        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS);
+
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
         .setNameFormat("RPC Router Client-" + userName + "-%d ").build();
 
-    BlockingQueue workQueue = new LinkedBlockingQueue<>();
-    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
-        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
+    this.executorService = new ThreadPoolExecutor(numMinThreads, numMaxThreads,
+        keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);
 
     final Configuration conf = this.getConf();
 
@@ -1949,6 +1954,46 @@ public class FederationClientInterceptor
     }
   }
 
+  protected int getNumMinThreads(Configuration conf) {
+
+    String threadSize = conf.get(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE);
+
+    // If the user configures YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
+    // we will still get the number of threads from this configuration.
+    if (StringUtils.isNotBlank(threadSize)) {
+      LOG.warn("{} is a deprecated property, " +
+          "please remove it, use {} to configure the minimum number of thread pool.",
+          YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
+          YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE);
+      return Integer.parseInt(threadSize);
+    }
+
+    int numMinThreads = conf.getInt(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE,
+        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE);
+    return numMinThreads;
+  }
+
+  protected int getNumMaxThreads(Configuration conf) {
+
+    String threadSize = conf.get(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE);
+
+    // If the user configures YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
+    // we will still get the number of threads from this configuration.
+    if (StringUtils.isNotBlank(threadSize)) {
+      LOG.warn("{} is a deprecated property, " +
+          "please remove it, use {} to configure the maximum number of thread pool.",
+          YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
+          YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE);
+      return Integer.parseInt(threadSize);
+    }
+
+    int numMaxThreads = conf.getInt(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE,
+        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE);
+    return numMaxThreads;
+  }
+
   @VisibleForTesting
   public void setNumSubmitRetries(int numSubmitRetries) {
     this.numSubmitRetries = numSubmitRetries;

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java

@@ -1520,4 +1520,34 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
     return ReservationDefinition.newInstance(arrival, deadline,
         requests, username, "0", Priority.UNDEFINED);
   }
+
+  @Test
+  public void testGetNumMinThreads() {
+    // If we don't configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE,
+    // we expect to get 5 threads
+    int minThreads = interceptor.getNumMinThreads(this.getConf());
+    Assert.assertEquals(5, minThreads);
+
+    // If we configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE,
+    // we expect to get 3 threads
+    this.getConf().unset(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE);
+    this.getConf().setInt(YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE, 3);
+    int minThreads2 = interceptor.getNumMinThreads(this.getConf());
+    Assert.assertEquals(3, minThreads2);
+  }
+
+  @Test
+  public void testGetNumMaxThreads() {
+    // If we don't configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE,
+    // we expect to get 5 threads
+    int minThreads = interceptor.getNumMaxThreads(this.getConf());
+    Assert.assertEquals(5, minThreads);
+
+    // If we configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE,
+    // we expect to get 8 threads
+    this.getConf().unset(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE);
+    this.getConf().setInt(YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE, 8);
+    int minThreads2 = interceptor.getNumMaxThreads(this.getConf());
+    Assert.assertEquals(8, minThreads2);
+  }
 }