Browse Source

HDFS-17766. [ARR] Avoid initializing unused threadPool in RouterAsyncRpcClient. (#7561). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
hfutatzhanghb 1 month ago
parent
commit
32929ebd16

+ 35 - 23
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -128,7 +128,7 @@ public class RouterRpcClient {
   /** Connection pool to the Namenodes per user for performance. */
   private final ConnectionManager connectionManager;
   /** Service to run asynchronous calls. */
-  private final ThreadPoolExecutor executorService;
+  private ThreadPoolExecutor executorService;
   /** Retry policy for router -> NN communication. */
   private final RetryPolicy retryPolicy;
   /** Optional perf monitor. */
@@ -184,24 +184,7 @@ public class RouterRpcClient {
     this.connectionManager.start();
     this.routerRpcFairnessPolicyController =
         FederationUtil.newFairnessPolicyController(conf);
-
-    int numThreads = conf.getInt(
-        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
-        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
-    ThreadFactory threadFactory = new ThreadFactoryBuilder()
-        .setNameFormat("RPC Router Client-%d")
-        .build();
-    BlockingQueue<Runnable> workQueue;
-    if (conf.getBoolean(
-        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
-        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
-      workQueue = new ArrayBlockingQueue<>(numThreads);
-    } else {
-      workQueue = new LinkedBlockingQueue<>();
-    }
-    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
-        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
-
+    initConcurrentCallExecutorService(conf);
     this.rpcMonitor = monitor;
 
     int maxFailoverAttempts = conf.getInt(
@@ -245,6 +228,25 @@ public class RouterRpcClient {
     this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>();
   }
 
+  protected void initConcurrentCallExecutorService(Configuration conf) {
+    int numThreads = conf.getInt(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
+        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("RPC Router Client-%d")
+        .build();
+    BlockingQueue<Runnable> workQueue;
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
+        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
+      workQueue = new ArrayBlockingQueue<>(numThreads);
+    } else {
+      workQueue = new LinkedBlockingQueue<>();
+    }
+    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
+        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+  }
+
   /**
    * Get the configuration for the RPC client. It takes the Router
    * configuration and transforms it into regular RPC Client configuration.
@@ -278,6 +280,15 @@ public class RouterRpcClient {
     return this.namenodeResolver;
   }
 
+  /**
+   * Get the executor service used by invoking concurrent calls.
+   * @return the executor service.
+   */
+  @VisibleForTesting
+  public ThreadPoolExecutor getExecutorService() {
+    return executorService;
+  }
+
   /**
    * Shutdown the client.
    */
@@ -364,9 +375,11 @@ public class RouterRpcClient {
    */
   public String getAsyncCallerPoolJson() {
     final Map<String, Integer> info = new LinkedHashMap<>();
-    info.put("active", executorService.getActiveCount());
-    info.put("total", executorService.getPoolSize());
-    info.put("max", executorService.getMaximumPoolSize());
+    if (executorService != null) {
+      info.put("active", executorService.getActiveCount());
+      info.put("total", executorService.getPoolSize());
+      info.put("max", executorService.getMaximumPoolSize());
+    }
     return JSON.toString(info);
   }
 
@@ -2027,7 +2040,6 @@ public class RouterRpcClient {
     return isUnavailableException(ioe);
   }
 
-
   /**
    * The {@link  ExecutionStatus} class is a utility class used to track the status of
    * execution operations performed by the {@link  RouterRpcClient}.

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java

@@ -116,6 +116,11 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
     this.rpcMonitor = monitor;
   }
 
+  @Override
+  protected void initConcurrentCallExecutorService(Configuration conf) {
+    // No need to initialize the thread pool for concurrent call.
+  }
+
   /**
    * Invoke method in all locations and return success if any succeeds.
    *

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

@@ -2426,4 +2426,9 @@ public class TestRouterRpc {
     // The audit log should not contain async:true.
     assertFalse(auditLog.getOutput().contains("async:true"));
   }
+
+  @Test
+  public void testConcurrentCallExecutorInitial() {
+    assertNotNull(router.getRouterRpcClient().getExecutorService());
+  }
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java

@@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /**
  * Testing the asynchronous RPC functionality of the router.
@@ -83,4 +84,9 @@ public class TestRouterAsyncRpc extends TestRouterRpc {
     assertArrayEquals(group, result);
   }
 
+  @Test
+  @Override
+  public void testConcurrentCallExecutorInitial() {
+    assertNull(rndRouter.getRouterRpcClient().getExecutorService());
+  }
 }

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java

@@ -34,6 +34,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /**
  * Testing the asynchronous RPC functionality of the router with multiple mounts.
@@ -70,4 +71,11 @@ public class TestRouterAsyncRpcMultiDestination extends TestRouterRpcMultiDestin
     String[] result = syncReturn(String[].class);
     assertArrayEquals(group, result);
   }
+
+  @Test
+  @Override
+  public void testConcurrentCallExecutorInitial() {
+    MiniRouterDFSCluster.RouterContext rndRouter = super.getRouterContext();
+    assertNull(rndRouter.getRouterRpcClient().getExecutorService());
+  }
 }