Browse Source

HDFS-17651.[ARR] Async handler executor isolation (#7244). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
hfutatzhanghb 4 months ago
parent
commit
f66c89b657
13 changed files with 190 additions and 93 deletions
  1. 5 5
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
  2. 16 9
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
  3. 104 35
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  4. 4 6
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
  5. 31 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
  6. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
  8. 6 7
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
  9. 6 7
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
  10. 6 7
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
  11. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java
  12. 6 7
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
  13. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

@@ -55,7 +55,7 @@ import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
 public final class AsyncRpcProtocolPBUtil {
   public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
   /** The executor used for handling responses asynchronously. */
-  private static Executor worker;
+  private static Executor asyncResponderExecutor;
 
   private AsyncRpcProtocolPBUtil() {}
 
@@ -97,7 +97,7 @@ public final class AsyncRpcProtocolPBUtil {
       } catch (Exception ex) {
         throw warpCompletionException(ex);
       }
-    }, worker));
+    }, asyncResponderExecutor));
     return asyncReturn(clazz);
   }
 
@@ -144,10 +144,10 @@ public final class AsyncRpcProtocolPBUtil {
    * Sets the executor used for handling responses asynchronously within
    * the utility class.
    *
-   * @param worker The executor to be used for handling responses asynchronously.
+   * @param asyncResponderExecutor The executor to be used for handling responses asynchronously.
    */
-  public static void setWorker(Executor worker) {
-    AsyncRpcProtocolPBUtil.worker = worker;
+  public static void setAsyncResponderExecutor(Executor asyncResponderExecutor) {
+    AsyncRpcProtocolPBUtil.asyncResponderExecutor = asyncResponderExecutor;
   }
 
   @FunctionalInterface

+ 16 - 9
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -72,15 +72,22 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_RPC_ENABLE =
       FEDERATION_ROUTER_PREFIX + "rpc.enable";
   public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
-  public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
-  public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
-  public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
-  public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
-  public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
-  public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
+  // HDFS Router Asynchronous RPC
+  public static final String DFS_ROUTER_ASYNC_RPC_ENABLE_KEY =
+      FEDERATION_ROUTER_PREFIX + "async.rpc.enable";
+  public static final boolean DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT = false;
+  public static final String FEDERATION_ROUTER_ASYNC_RPC_PREFIX =
+          FEDERATION_ROUTER_PREFIX + "async.rpc.";
+  // Example: ns1:count1,ns2:count2,ns3:count3
+  public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY =
+          FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.handler.count";
+  public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT = "";
+  public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
+          FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
+  public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
+  public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
+          FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
+  public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
 
   public static final String DFS_ROUTER_METRICS_ENABLE =
       FEDERATION_ROUTER_PREFIX + "metrics.enable";

+ 104 - 35
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -18,6 +18,16 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT;
@@ -26,16 +36,8 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
@@ -56,6 +58,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -63,8 +66,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -72,6 +75,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -209,6 +213,7 @@ import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -228,8 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RouterRpcServer.class);
-  private ExecutorService asyncRouterHandler;
-  private ExecutorService asyncRouterResponder;
+
+  /** Name service keyword to identify fan-out calls. */
+  public static final String CONCURRENT_NS = "concurrent";
 
   /** Configuration for the RPC server. */
   private Configuration conf;
@@ -287,6 +293,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   /** Schedule the router federation rename jobs. */
   private BalanceProcedureScheduler fedRenameScheduler;
   private boolean enableAsync;
+  private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
+  private Map<String, ExecutorService> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
+  private ExecutorService routerAsyncResponderExecutor;
+  private ExecutorService routerDefaultAsyncHandlerExecutor;
 
   /**
    * Construct a router RPC server.
@@ -318,11 +328,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY,
         DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);
 
-    this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC,
-        DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT);
-    LOG.info("Router enable async {}", this.enableAsync);
+    this.enableAsync = conf.getBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY,
+        DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT);
+    LOG.info("Router enable async rpc: {}", this.enableAsync);
     if (this.enableAsync) {
-      initAsyncThreadPool();
+      initAsyncThreadPools(conf);
     }
     // Override Hadoop Common IPC setting
     int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY,
@@ -446,8 +456,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     // Create the client
     if (this.enableAsync) {
       this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
-          this.namenodeResolver, this.rpcMonitor,
-          routerStateIdContext, asyncRouterHandler);
+          this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
       this.clientProto = new RouterAsyncClientProtocol(conf, this);
       this.nnProto = new RouterAsyncNamenodeProtocol(this);
       this.routerProto = new RouterAsyncUserProtocol(this);
@@ -491,23 +500,77 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder "));
+
+    int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder #"));
+    }
+    AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+    if (StringUtils.isEmpty(configNsHandler)) {
+      LOG.error(
+          "The value of config key: {} is empty. Will use default conf.",
+          DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
     }
-    AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
+    String[] nsHandlers = configNsHandler.split(",");
+    for (String nsHandlerInfo : nsHandlers) {
+      String[] nsHandlerItems = nsHandlerInfo.split(":");
+      if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) ||
+          !StringUtils.isNumeric(nsHandlerItems[1])) {
+        LOG.error("The config key: {} is incorrect! The value is {}.",
+            DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo);
+        continue;
+      }
+      nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1]));
+    }
+  }
+
+  private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) {
+    asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool(
+        dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #")));
   }
 
   /**
@@ -2426,8 +2489,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return this.enableAsync;
   }
 
-  public Executor getAsyncRouterHandler() {
-    return asyncRouterHandler;
+  public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() {
+    return asyncRouterHandlerExecutors;
+  }
+
+  public ExecutorService getRouterAsyncHandlerDefaultExecutor() {
+    return routerDefaultAsyncHandlerExecutor;
   }
 
   private static class AsyncThreadFactory implements ThreadFactory {
@@ -2439,8 +2506,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     }
 
     @Override
-    public Thread newThread(Runnable r) {
-      return new Thread(r, namePrefix + threadNumber.getAndIncrement());
+    public Thread newThread(@NonNull Runnable r) {
+      Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
+      thread.setDaemon(true);
+      return thread;
     }
   }
 }

+ 4 - 6
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java

@@ -57,7 +57,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
@@ -98,7 +97,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
   private final ActiveNamenodeResolver namenodeResolver;
   /** Optional perf monitor. */
   private final RouterRpcMonitor rpcMonitor;
-  private final Executor asyncRouterHandler;
 
   /**
    * Create a router async RPC client to manage remote procedure calls to NNs.
@@ -108,17 +106,15 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
    * @param resolver A NN resolver to determine the currently active NN in HA.
    * @param monitor Optional performance monitor.
    * @param routerStateIdContext the router state context object to hold the state ids for all
-   * @param asyncRouterHandler async router handler
    * namespaces.
    */
   public RouterAsyncRpcClient(Configuration conf,
       Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor,
-      RouterStateIdContext routerStateIdContext, Executor asyncRouterHandler) {
+      RouterStateIdContext routerStateIdContext) {
     super(conf, router, resolver, monitor, routerStateIdContext);
     this.router = router;
     this.namenodeResolver = resolver;
     this.rpcMonitor = monitor;
-    this.asyncRouterHandler = asyncRouterHandler;
   }
 
   /**
@@ -172,6 +168,7 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
           " with params " + Arrays.deepToString(params) + " from "
           + router.getRouterId());
     }
+    String nsid = namenodes.get(0).getNameserviceId();
     // transfer threadLocalContext to worker threads of executor.
     ThreadLocalContext threadLocalContext = new ThreadLocalContext();
     asyncComplete(null);
@@ -183,7 +180,8 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
       threadLocalContext.transfer();
       invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
           useObserver, protocol, method, params);
-    }, asyncRouterHandler);
+    }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
+        router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
     return null;
   }
 

+ 31 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -49,13 +49,40 @@
   </property>
 
   <property>
-    <name>dfs.federation.router.rpc.async.enable</name>
+    <name>dfs.federation.router.async.rpc.enable</name>
     <value>false</value>
     <description>
       If true, router will process the RPC request asynchronously.
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.async.rpc.ns.handler.count</name>
+    <value></value>
+    <description>
+      The number of asynchronous handlers per nameservice, separated by commas, internally separated by colons.
+      The identifier of nameservice is in dfs.nameservices configuration entry.
+      Such as: ns1:count1,ns2:count2,ns3:count3.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.async.rpc.responder.count</name>
+    <value>10</value>
+    <description>
+      For those nameservices not in dfs.federation.router.async.rpc.ns.handler.count configuration entry,
+      use this value as the asynchronous handler thread counts.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.async.rpc.responder.count</name>
+    <value>10</value>
+    <description>
+      The thread counts of async responder executor.
+    </description>
+  </property>
+
   <property>
     <name>dfs.federation.router.rpc-address</name>
     <value>0.0.0.0:8888</value>
@@ -110,15 +137,15 @@
   </property>
 
   <property>
-    <name>dfs.federation.router.rpc.async.handler.count</name>
-    <value>2</value>
+    <name>dfs.federation.router.async.rpc.handler.count</name>
+    <value>10</value>
     <description>
       The number of async handler for the router to handle RPC client requests.
     </description>
   </property>
 
   <property>
-    <name>dfs.federation.router.rpc.async.responder.count</name>
+    <name>dfs.federation.router.async.rpc.responder.count</name>
     <value>10</value>
     <description>
       The number of async responder for the router to handle responses.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java

@@ -54,7 +54,7 @@ public class TestAsyncRpcProtocolPBUtil {
 
   @Before
   public void setUp() throws IOException {
-    AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
+    AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(ForkJoinPool.commonPool());
     Configuration conf = new Configuration();
     RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class,
         ProtobufRpcEngine2.class);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java

@@ -82,7 +82,7 @@ public class TestRouterClientSideTranslatorPB {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
+    AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(ForkJoinPool.commonPool());
     conf = new HdfsConfiguration();
     cluster = (new MiniDFSCluster.Builder(conf))
         .numDataNodes(1).build();

+ 6 - 7
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java

@@ -38,8 +38,8 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
 import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -80,8 +80,8 @@ public class RouterAsyncProtocolTestBase {
 
     // Reduce the number of RPC clients threads to overload the Router easy
     routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
     // We decrease the DN cache times to make the test faster
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
@@ -108,12 +108,11 @@ public class RouterAsyncProtocolTestBase {
     router = cluster.getRandomRouter();
     routerFs = router.getFileSystem();
     routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
+    routerRpcServer.initAsyncThreadPools(routerConf);
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext(),
-        routerRpcServer.getAsyncRouterHandler());
+        routerRpcServer.getRouterStateIdContext());
     routerAsyncRpcServer = Mockito.spy(routerRpcServer);
     Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
     Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true);

+ 6 - 7
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java

@@ -49,8 +49,8 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
 import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -94,8 +94,8 @@ public class TestRouterAsyncErasureCoding {
 
     // Reduce the number of RPC clients threads to overload the Router easy
     routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
     // We decrease the DN cache times to make the test faster
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
@@ -122,12 +122,11 @@ public class TestRouterAsyncErasureCoding {
     router = cluster.getRandomRouter();
     routerFs = router.getFileSystem();
     routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
+    routerRpcServer.initAsyncThreadPools(routerConf);
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext(),
-        routerRpcServer.getAsyncRouterHandler());
+        routerRpcServer.getRouterStateIdContext());
     RouterRpcServer spy = Mockito.spy(routerRpcServer);
     Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
     asyncErasureCoding = new AsyncErasureCoding(spy);

+ 6 - 7
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java

@@ -44,8 +44,8 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
 import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertTrue;
 
@@ -87,8 +87,8 @@ public class TestRouterAsyncQuota {
 
     // Reduce the number of RPC clients threads to overload the Router easy
     routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
     // We decrease the DN cache times to make the test faster
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
@@ -116,12 +116,11 @@ public class TestRouterAsyncQuota {
     router = cluster.getRandomRouter();
     routerFs = router.getFileSystem();
     routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
+    routerRpcServer.initAsyncThreadPools(routerConf);
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext(),
-        routerRpcServer.getAsyncRouterHandler());
+        routerRpcServer.getRouterStateIdContext());
     RouterRpcServer spy = Mockito.spy(routerRpcServer);
     Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
     asyncQuota = new AsyncQuota(router.getRouter(), spy);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java

@@ -29,7 +29,7 @@ import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 
@@ -51,7 +51,7 @@ public class TestRouterAsyncRpc extends TestRouterRpc {
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
     // use async router.
-    routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true);
+    routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
     setUp(routerConf);
   }
 

+ 6 - 7
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java

@@ -57,8 +57,8 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.fs.permission.FsAction.ALL;
 import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
 import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -110,8 +110,8 @@ public class TestRouterAsyncRpcClient {
 
     // Reduce the number of RPC clients threads to overload the Router easy
     routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
     // We decrease the DN cache times to make the test faster
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
@@ -146,14 +146,13 @@ public class TestRouterAsyncRpcClient {
     rpcMetrics = router.getRouter().getRpcServer().getRPCMetrics();
     routerFs = router.getFileSystem();
     routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
+    routerRpcServer.initAsyncThreadPools(routerConf);
 
     // Create a RouterAsyncRpcClient object
     asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext(),
-        routerRpcServer.getAsyncRouterHandler());
+        routerRpcServer.getRouterStateIdContext());
 
     // Create a test file
     FSDataOutputStream fsDataOutputStream = routerFs.create(

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java

@@ -28,7 +28,7 @@ import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 
@@ -48,7 +48,7 @@ public class TestRouterAsyncRpcMultiDestination extends TestRouterRpcMultiDestin
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
     // use async router.
-    routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true);
+    routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
     setUp(routerConf);
   }