Bläddra i källkod

HDFS-17545. [ARR] router async rpc client. (#6871). Contributed by Jian Zhang.

Reviewed-by: hfutatzhanghb <hfutzhanghb@163.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Jian Zhang 7 månader sedan
förälder
incheckning
71c74660b6
12 ändrade filer med 1624 tillägg och 198 borttagningar
  1. 52 13
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
  2. 18 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
  3. 9 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
  4. 621 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java
  5. 388 173
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
  6. 71 7
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  7. 101 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java
  8. 22 3
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java
  9. 24 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
  10. 2 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
  11. 2 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
  12. 314 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java

+ 52 - 13
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

@@ -18,12 +18,11 @@
 
 
 package org.apache.hadoop.hdfs.protocolPB;
 package org.apache.hadoop.hdfs.protocolPB;
 
 
+import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
 import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
 import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine2;
 import org.apache.hadoop.ipc.ProtobufRpcEngine2;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
 import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
 import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -31,18 +30,48 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 
 import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
 import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
 import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
 import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
 import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
 import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
 import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
 import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
 
 
+/**
+ * <p>This utility class encapsulates the logic required to initiate asynchronous RPCs,
+ * handle responses, and propagate exceptions. It works in conjunction with
+ * {@link ProtobufRpcEngine2} and {@link Client} to facilitate the asynchronous
+ * nature of the operations.
+ *
+ * @see ProtobufRpcEngine2
+ * @see Client
+ * @see CompletableFuture
+ */
 public final class AsyncRpcProtocolPBUtil {
 public final class AsyncRpcProtocolPBUtil {
   public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
   public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
+  /** The executor used for handling responses asynchronously. */
+  private static Executor worker;
 
 
   private AsyncRpcProtocolPBUtil() {}
   private AsyncRpcProtocolPBUtil() {}
 
 
+  /**
+   * Asynchronously invokes an RPC call and applies a response transformation function
+   * to the result. This method is generic and can be used to handle any type of
+   * RPC call.
+   *
+   * <p>The method uses the {@link ShadedProtobufHelper.IpcCall} to prepare the RPC call
+   * and the {@link ApplyFunction} to process the response. It also handles exceptions
+   * that may occur during the RPC call and wraps them in a user-friendly manner.
+   *
+   * @param call The IPC call encapsulating the RPC request.
+   * @param response The function to apply to the response of the RPC call.
+   * @param clazz The class object representing the type {@code R} of the response.
+   * @param <T> Type of the call's result.
+   * @param <R> Type of method return.
+   * @return An object of type {@code R} that is the result of applying the response
+   *         function to the RPC call result.
+   * @throws IOException If an I/O error occurs during the asynchronous RPC call.
+   */
   public static <T, R> R asyncIpcClient(
   public static <T, R> R asyncIpcClient(
       ShadedProtobufHelper.IpcCall<T> call, ApplyFunction<T, R> response,
       ShadedProtobufHelper.IpcCall<T> call, ApplyFunction<T, R> response,
       Class<R> clazz) throws IOException {
       Class<R> clazz) throws IOException {
@@ -50,20 +79,30 @@ public final class AsyncRpcProtocolPBUtil {
     AsyncGet<T, Exception> asyncReqMessage =
     AsyncGet<T, Exception> asyncReqMessage =
         (AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
         (AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
     CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
     CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
-    // transfer originCall & callerContext to worker threads of executor.
-    final Server.Call originCall = Server.getCurCall().get();
-    final CallerContext originContext = CallerContext.getCurrent();
-    asyncCompleteWith(responseFuture);
-    asyncApply(o -> {
+    // transfer thread local context to worker threads of executor.
+    ThreadLocalContext threadLocalContext = new ThreadLocalContext();
+    asyncCompleteWith(responseFuture.handleAsync((result, e) -> {
+      threadLocalContext.transfer();
+      if (e != null) {
+        throw warpCompletionException(e);
+      }
       try {
       try {
-        Server.getCurCall().set(originCall);
-        CallerContext.setCurrent(originContext);
         T res = asyncReqMessage.get(-1, null);
         T res = asyncReqMessage.get(-1, null);
         return response.apply(res);
         return response.apply(res);
-      } catch (Exception e) {
-        throw warpCompletionException(e);
+      } catch (Exception ex) {
+        throw warpCompletionException(ex);
       }
       }
-    });
+    }, worker));
     return asyncReturn(clazz);
     return asyncReturn(clazz);
   }
   }
+
+  /**
+   * Sets the executor used for handling responses asynchronously within
+   * the utility class.
+   *
+   * @param worker The executor to be used for handling responses asynchronously.
+   */
+  public static void setWorker(Executor worker) {
+    AsyncRpcProtocolPBUtil.worker = worker;
+  }
 }
 }

+ 18 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java

@@ -52,9 +52,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
 
 
 
 
   /** Time for an operation to be received in the Router. */
   /** Time for an operation to be received in the Router. */
-  private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
+  private static final ThreadLocal<Long> START_TIME = ThreadLocal.withInitial(() -> -1L);
   /** Time for an operation to be sent to the Namenode. */
   /** Time for an operation to be sent to the Namenode. */
-  private static final ThreadLocal<Long> PROXY_TIME = new ThreadLocal<>();
+  private static final ThreadLocal<Long> PROXY_TIME = ThreadLocal.withInitial(() -> -1L);
 
 
   /** Configuration for the performance monitor. */
   /** Configuration for the performance monitor. */
   private Configuration conf;
   private Configuration conf;
@@ -141,6 +141,14 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
     START_TIME.set(monotonicNow());
     START_TIME.set(monotonicNow());
   }
   }
 
 
+  public static long getStartOpTime() {
+    return START_TIME.get();
+  }
+
+  public static void setStartOpTime(long startOpTime) {
+    START_TIME.set(startOpTime);
+  }
+
   @Override
   @Override
   public long proxyOp() {
   public long proxyOp() {
     PROXY_TIME.set(monotonicNow());
     PROXY_TIME.set(monotonicNow());
@@ -151,6 +159,14 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
     return Thread.currentThread().getId();
     return Thread.currentThread().getId();
   }
   }
 
 
+  public static long getProxyOpTime() {
+    return PROXY_TIME.get();
+  }
+
+  public static void setProxyOpTime(long proxyOpTime) {
+    PROXY_TIME.set(proxyOpTime);
+  }
+
   @Override
   @Override
   public void proxyOpComplete(boolean success, String nsId,
   public void proxyOpComplete(boolean success, String nsId,
       FederationNamenodeServiceState state) {
       FederationNamenodeServiceState state) {

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -72,6 +72,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_RPC_ENABLE =
   public static final String DFS_ROUTER_RPC_ENABLE =
       FEDERATION_ROUTER_PREFIX + "rpc.enable";
       FEDERATION_ROUTER_PREFIX + "rpc.enable";
   public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
   public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
+  public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
+      FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
+  public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
+  public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
+      FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
+  public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
+  public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
+      FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
+  public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
 
 
   public static final String DFS_ROUTER_METRICS_ENABLE =
   public static final String DFS_ROUTER_METRICS_ENABLE =
       FEDERATION_ROUTER_PREFIX + "metrics.enable";
       FEDERATION_ROUTER_PREFIX + "metrics.enable";

+ 621 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java

@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.NameNodeProxiesClient;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApplyUseExecutor;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.getCompletableFuture;
+
+/**
+ * The {@code RouterAsyncRpcClient} class extends the functionality of the base
+ * {@code RouterRpcClient} class to provide asynchronous remote procedure call (RPC)
+ * capabilities for communication with the Hadoop Distributed File System (HDFS)
+ * NameNodes in a federated environment.
+ *
+ * <p>This class is responsible for managing the asynchronous execution of RPCs to
+ * multiple NameNodes, which can improve performance and scalability in large HDFS
+ * deployments.
+ *
+ * <p>The class also includes methods for handling failover scenarios, where it can
+ * automatically retry operations on alternative NameNodes if the primary NameNode is
+ * unavailable or in standby mode.
+ *
+ * @see RouterRpcClient
+ */
+public class RouterAsyncRpcClient extends RouterRpcClient{
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterAsyncRpcClient.class);
+  /** Router using this RPC client. */
+  private final Router router;
+  /** Interface to identify the active NN for a nameservice or blockpool ID. */
+  private final ActiveNamenodeResolver namenodeResolver;
+  /** Optional perf monitor. */
+  private final RouterRpcMonitor rpcMonitor;
+  private final Executor asyncRouterHandler;
+
+  /**
+   * Create a router async RPC client to manage remote procedure calls to NNs.
+   *
+   * @param conf Hdfs Configuration.
+   * @param router A router using this RPC client.
+   * @param resolver A NN resolver to determine the currently active NN in HA.
+   * @param monitor Optional performance monitor.
+   * @param routerStateIdContext the router state context object to hold the state ids for all
+   * namespaces.
+   */
+  public RouterAsyncRpcClient(
+      Configuration conf, Router router, ActiveNamenodeResolver resolver,
+      RouterRpcMonitor monitor, RouterStateIdContext routerStateIdContext) {
+    super(conf, router, resolver, monitor, routerStateIdContext);
+    this.router = router;
+    this.namenodeResolver = resolver;
+    this.rpcMonitor = monitor;
+    this.asyncRouterHandler = router.getRpcServer().getAsyncRouterHandler();
+  }
+
+  /**
+   * Invoke method in all locations and return success if any succeeds.
+   *
+   * @param <T> The type of the remote location.
+   * @param locations List of remote locations to call concurrently.
+   * @param method The remote method and parameters to invoke.
+   * @return If the call succeeds in any location.
+   * @throws IOException If any of the calls return an exception.
+   */
+  @Override
+  public <T extends RemoteLocationContext> boolean invokeAll(
+      final Collection<T> locations, final RemoteMethod method)
+      throws IOException {
+    invokeConcurrent(locations, method, false, false,
+        Boolean.class);
+    asyncApply((ApplyFunction<Map<T, Boolean>, Object>)
+        results -> results.containsValue(true));
+    return asyncReturn(boolean.class);
+  }
+
+  /**
+   * Invokes a method against the ClientProtocol proxy server. If a standby
+   * exception is generated by the call to the client, retries using the
+   * alternate server.
+   * <p>
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param ugi User group information.
+   * @param namenodes A prioritized list of namenodes within the same
+   *                  nameservice.
+   * @param useObserver Whether to use observer namenodes.
+   * @param protocol the protocol of the connection.
+   * @param method Remote ClientProtocol method to invoke.
+   * @param params Variable list of parameters matching the method.
+   * @return The result of invoking the method.
+   * @throws ConnectException If it cannot connect to any Namenode.
+   * @throws StandbyException If all Namenodes are in Standby.
+   * @throws IOException If it cannot invoke the method.
+   */
+  @Override
+  public Object invokeMethod(
+      UserGroupInformation ugi,
+      List<? extends FederationNamenodeContext> namenodes,
+      boolean useObserver, Class<?> protocol,
+      Method method, Object... params) throws IOException {
+    if (namenodes == null || namenodes.isEmpty()) {
+      throw new IOException("No namenodes to invoke " + method.getName() +
+          " with params " + Arrays.deepToString(params) + " from "
+          + router.getRouterId());
+    }
+    // transfer threadLocalContext to worker threads of executor.
+    ThreadLocalContext threadLocalContext = new ThreadLocalContext();
+    asyncComplete(null);
+    asyncApplyUseExecutor((AsyncApplyFunction<Object, Object>) o -> {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver,
+            namenodes.toString(), params);
+      }
+      threadLocalContext.transfer();
+      invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
+          useObserver, protocol, method, params);
+    }, asyncRouterHandler);
+    return null;
+  }
+
+  /**
+   * Asynchronously invokes a method on the specified NameNodes for a given user and operation.
+   * This method is responsible for the actual execution of the remote method call on the
+   * NameNodes in a non-blocking manner, allowing for concurrent processing.
+   *
+   * <p>In case of exceptions, the method includes logic to handle retries, failover to standby
+   * NameNodes, and proper exception handling to ensure that the calling code can respond
+   * appropriately to different error conditions.
+   *
+   * @param ugi The user information under which the method is to be invoked.
+   * @param namenodes The list of NameNode contexts on which the method will be invoked.
+   * @param useObserver Whether to use an observer node for the invocation if available.
+   * @param protocol The protocol class defining the method to be invoked.
+   * @param method The method to be invoked on the NameNodes.
+   * @param params The parameters for the method invocation.
+   */
+  private void invokeMethodAsync(
+      final UserGroupInformation ugi,
+      final List<FederationNamenodeContext> namenodes,
+      boolean useObserver,
+      final Class<?> protocol, final Method method, final Object... params) {
+
+    addClientInfoToCallerContext(ugi);
+    if (rpcMonitor != null) {
+      rpcMonitor.proxyOp();
+    }
+    final ExecutionStatus status = new ExecutionStatus(false, useObserver);
+    Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
+    final ConnectionContext[] connection = new ConnectionContext[1];
+    asyncForEach(namenodes.iterator(),
+        (foreach, namenode) -> {
+          if (!status.isShouldUseObserver()
+              && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
+            asyncComplete(null);
+            return;
+          }
+          String nsId = namenode.getNameserviceId();
+          String rpcAddress = namenode.getRpcAddress();
+          asyncTry(() -> {
+            connection[0] = getConnection(ugi, nsId, rpcAddress, protocol);
+            NameNodeProxiesClient.ProxyAndInfo<?> client = connection[0].getClient();
+            invoke(namenode, status.isShouldUseObserver(), 0, method,
+                  client.getProxy(), params);
+            asyncApply(res -> {
+              status.setComplete(true);
+              postProcessResult(method, status, namenode, nsId, client);
+              foreach.breakNow();
+              return res;
+            });
+          });
+          asyncCatch((res, ioe) -> {
+            ioes.put(namenode, ioe);
+            handleInvokeMethodIOException(namenode, ioe, status, useObserver);
+            return res;
+          }, IOException.class);
+          asyncFinally(res -> {
+            if (connection[0] != null) {
+              connection[0].release();
+            }
+            return res;
+          });
+        });
+
+    asyncApply(res -> {
+      if (status.isComplete()) {
+        return res;
+      }
+      return handlerAllNamenodeFail(namenodes, method, ioes, params);
+    });
+  }
+
+  /**
+   * Asynchronously invokes a method on a specified NameNode in the context of the given
+   * namespace and NameNode information. This method is designed to handle the invocation
+   * in a non-blocking manner, allowing for improved performance and scalability when
+   * interacting with the NameNode.
+   *
+   * @param namenode The context information for the NameNode.
+   * @param listObserverFirst Whether to list the observer node first in the invocation list.
+   * @param retryCount The current retry count for the operation.
+   * @param method The method to be invoked on the NameNode.
+   * @param obj The proxy object through which the method will be invoked.
+   * @param params The parameters for the method invocation.
+   */
+  protected Object invoke(
+      FederationNamenodeContext namenode, Boolean listObserverFirst,
+      int retryCount, final Method method,
+      final Object obj, final Object... params) throws IOException {
+    try {
+      Client.setAsynchronousMode(true);
+      method.invoke(obj, params);
+      Client.setAsynchronousMode(false);
+      asyncCatch((AsyncCatchFunction<Object, Throwable>) (o, e) -> {
+        handlerInvokeException(namenode, listObserverFirst,
+            retryCount, method, obj, e, params);
+      }, Throwable.class);
+    } catch (InvocationTargetException e) {
+      asyncThrowException(e.getCause());
+    } catch (IllegalAccessException | IllegalArgumentException e) {
+      LOG.error("Unexpected exception while proxying API", e);
+      asyncThrowException(e);
+    }
+    return null;
+  }
+
+  /**
+   * Invokes sequential proxy calls to different locations. Continues to invoke
+   * calls until the success condition is met, or until all locations have been
+   * attempted.
+   *
+   * The success condition may be specified by:
+   * <ul>
+   * <li>An expected result class
+   * <li>An expected result value
+   * </ul>
+   *
+   * If no expected result class/values are specified, the success condition is
+   * a call that does not throw a remote exception.
+   *
+   * @param <T> The type of the remote method return.
+   * @param locations List of locations/nameservices to call concurrently.
+   * @param remoteMethod The remote method and parameters to invoke.
+   * @param expectedResultClass In order to be considered a positive result, the
+   *          return type must be of this class.
+   * @param expectedResultValue In order to be considered a positive result, the
+   *          return value must equal the value of this object.
+   * @return The result of the first successful call, or if no calls are
+   *         successful, the result of the first RPC call executed.
+   * @throws IOException if the success condition is not met, return the first
+   *                     remote exception generated.
+   */
+  @Override
+  public <T> T invokeSequential(
+      final List<? extends RemoteLocationContext> locations,
+      final RemoteMethod remoteMethod, Class<T> expectedResultClass,
+      Object expectedResultValue) throws IOException {
+    invokeSequential(remoteMethod, locations, expectedResultClass, expectedResultValue);
+    asyncApply((ApplyFunction<RemoteResult, Object>) RemoteResult::getResult);
+    return asyncReturn(expectedResultClass);
+  }
+
+  /**
+   * Invokes sequential proxy calls to different locations. Continues to invoke
+   * calls until the success condition is met, or until all locations have been
+   * attempted.
+   *
+   * The success condition may be specified by:
+   * <ul>
+   * <li>An expected result class
+   * <li>An expected result value
+   * </ul>
+   *
+   * If no expected result class/values are specified, the success condition is
+   * a call that does not throw a remote exception.
+   *
+   * This returns RemoteResult, which contains the invoked location as well
+   * as the result.
+   *
+   * @param <R> The type of the remote location.
+   * @param <T> The type of the remote method return.
+   * @param remoteMethod The remote method and parameters to invoke.
+   * @param locations List of locations/nameservices to call concurrently.
+   * @param expectedResultClass In order to be considered a positive result, the
+   *          return type must be of this class.
+   * @param expectedResultValue In order to be considered a positive result, the
+   *          return value must equal the value of this object.
+   * @return The result of the first successful call, or if no calls are
+   *         successful, the result of the first RPC call executed, along with
+   *         the invoked location in form of RemoteResult.
+   * @throws IOException if the success condition is not met, return the first
+   *                     remote exception generated.
+   */
+  @Override
+  public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
+      final RemoteMethod remoteMethod, final List<R> locations,
+      Class<T> expectedResultClass, Object expectedResultValue)
+      throws IOException {
+
+    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    final Method m = remoteMethod.getMethod();
+    List<IOException> thrownExceptions = new ArrayList<>();
+    final Object[] firstResult = {null};
+    final ExecutionStatus status = new ExecutionStatus();
+    Iterator<RemoteLocationContext> locationIterator =
+        (Iterator<RemoteLocationContext>) locations.iterator();
+    // Invoke in priority order
+    asyncForEach(locationIterator,
+        (foreach, loc) -> {
+          String ns = loc.getNameserviceId();
+          boolean isObserverRead = isObserverReadEligible(ns, m);
+          List<? extends FederationNamenodeContext> namenodes =
+              getOrderedNamenodes(ns, isObserverRead);
+          acquirePermit(ns, ugi, remoteMethod, controller);
+          asyncTry(() -> {
+            Class<?> proto = remoteMethod.getProtocol();
+            Object[] params = remoteMethod.getParams(loc);
+            invokeMethod(ugi, namenodes, isObserverRead, proto, m, params);
+            asyncApply(result -> {
+              // Check if the result is what we expected
+              if (isExpectedClass(expectedResultClass, result) &&
+                  isExpectedValue(expectedResultValue, result)) {
+                // Valid result, stop here
+                @SuppressWarnings("unchecked") R location = (R) loc;
+                @SuppressWarnings("unchecked") T ret = (T) result;
+                foreach.breakNow();
+                status.setComplete(true);
+                return new RemoteResult<>(location, ret);
+              }
+              if (firstResult[0] == null) {
+                firstResult[0] = result;
+              }
+              return null;
+            });
+          });
+          asyncCatch((ret, e) -> {
+            if (e instanceof IOException) {
+              IOException ioe = (IOException) e;
+              // Localize the exception
+              ioe = processException(ioe, loc);
+              // Record it and move on
+              thrownExceptions.add(ioe);
+            } else {
+              // Unusual error, ClientProtocol calls always use IOException (or
+              // RemoteException). Re-wrap in IOException for compatibility with
+              // ClientProtocol.
+              LOG.error("Unexpected exception {} proxying {} to {}",
+                  e.getClass(), m.getName(), ns, e);
+              IOException ioe = new IOException(
+                  "Unexpected exception proxying API " + e.getMessage(), e);
+              thrownExceptions.add(ioe);
+            }
+            return ret;
+          }, Exception.class);
+          asyncFinally(ret -> {
+            releasePermit(ns, ugi, remoteMethod, controller);
+            return ret;
+          });
+        });
+    asyncApply(result -> {
+      if (status.isComplete()) {
+        return result;
+      }
+      if (!thrownExceptions.isEmpty()) {
+        // An unavailable subcluster may be the actual cause
+        // We cannot surface other exceptions (e.g., FileNotFoundException)
+        for (int i = 0; i < thrownExceptions.size(); i++) {
+          IOException ioe = thrownExceptions.get(i);
+          if (isUnavailableException(ioe)) {
+            throw ioe;
+          }
+        }
+        // re-throw the first exception thrown for compatibility
+        throw thrownExceptions.get(0);
+      }
+      // Return the first result, whether it is the value or not
+      @SuppressWarnings("unchecked") T ret = (T) firstResult[0];
+      return new RemoteResult<>(locations.get(0), ret);
+    });
+    return asyncReturn(RemoteResult.class);
+  }
+
+  /**
+   * Invokes multiple concurrent proxy calls to different clients. Returns an
+   * array of results.
+   * <p>
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @param locations List of remote locations to call concurrently.
+   * @param method The remote method and parameters to invoke.
+   * @param requireResponse If true an exception will be thrown if all calls do
+   *          not complete. If false exceptions are ignored and all data results
+   *          successfully received are returned.
+   * @param standby If the requests should go to the standby namenodes too.
+   * @param timeOutMs Timeout for each individual call.
+   * @param clazz Type of the remote return type.
+   * @return Result of invoking the method per subcluster: nsId to result.
+   * @throws IOException If requiredResponse=true and any of the calls throw an
+   *           exception.
+   */
+  @Override
+  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
+      final Collection<T> locations, final RemoteMethod method,
+      boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
+      throws IOException {
+    invokeConcurrent(locations, method, standby, timeOutMs, clazz);
+    asyncApply((ApplyFunction<List<RemoteResult<T, R>>, Object>)
+        results -> postProcessResult(requireResponse, results));
+    return asyncReturn(Map.class);
+  }
+
+  /**
+   * Invokes multiple concurrent proxy calls to different clients. Returns an
+   * array of results.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @param method The remote method and parameters to invoke.
+   * @param timeOutMs Timeout for each individual call.
+   * @param controller Fairness manager to control handlers assigned per NS.
+   * @param orderedLocations List of remote locations to call concurrently.
+   * @param callables Invoke method for each NameNode.
+   * @return Result of invoking the method per subcluster (list of results),
+   *         This includes the exception for each remote location.
+   * @throws IOException If there are errors invoking the method.
+   */
+  @Override
+  protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemoteResults(
+      RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
+      List<T> orderedLocations, List<Callable<Object>> callables) throws IOException {
+    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    final Method m = method.getMethod();
+    final CompletableFuture<Object>[] futures =
+        new CompletableFuture[callables.size()];
+    int i = 0;
+    for (Callable<Object> callable : callables) {
+      CompletableFuture<Object> future = null;
+      try {
+        callable.call();
+        future = getCompletableFuture();
+      } catch (Exception e) {
+        future = new CompletableFuture<>();
+        future.completeExceptionally(warpCompletionException(e));
+      }
+      futures[i++] = future;
+    }
+
+    asyncCompleteWith(CompletableFuture.allOf(futures)
+        .handle((unused, throwable) -> {
+          try {
+            return processFutures(method, m, orderedLocations, Arrays.asList(futures));
+          } catch (InterruptedException e) {
+            LOG.error("Unexpected error while invoking API: {}", e.getMessage());
+            throw warpCompletionException(new IOException(
+                "Unexpected error while invoking API " + e.getMessage(), e));
+          } finally {
+            releasePermit(CONCURRENT_NS, ugi, method, controller);
+          }
+        }));
+    return asyncReturn(List.class);
+  }
+
+  /**
+   * Invokes a ClientProtocol method against the specified namespace.
+   * <p>
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @param location RemoteLocation to invoke.
+   * @param method The remote method and parameters to invoke.
+   * @return Result of invoking the method per subcluster (list of results),
+   *         This includes the exception for each remote location.
+   * @throws IOException If there are errors invoking the method.
+   */
+  @Override
+  public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingle(
+      T location, RemoteMethod method) throws IOException {
+    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    final Method m = method.getMethod();
+    String ns = location.getNameserviceId();
+    boolean isObserverRead = isObserverReadEligible(ns, m);
+    final List<? extends FederationNamenodeContext> namenodes =
+        getOrderedNamenodes(ns, isObserverRead);
+    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+    acquirePermit(ns, ugi, method, controller);
+    asyncTry(() -> {
+      Class<?> proto = method.getProtocol();
+      Object[] paramList = method.getParams(location);
+      invokeMethod(ugi, namenodes, isObserverRead, proto, m, paramList);
+      asyncApply((ApplyFunction<R, Object>) result -> {
+        RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result);
+        return Collections.singletonList(remoteResult);
+      });
+    });
+    asyncCatch((o, ioe) -> {
+      throw processException(ioe, location);
+    }, IOException.class);
+    asyncFinally(o -> {
+      releasePermit(ns, ugi, method, controller);
+      return o;
+    });
+    return asyncReturn(List.class);
+  }
+
+  /**
+   * Invokes a ClientProtocol method against the specified namespace.
+   * <p>
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param nsId Target namespace for the method.
+   * @param method The remote method and parameters to invoke.
+   * @return The result of invoking the method.
+   * @throws IOException If the invoke generated an error.
+   */
+  @Override
+  public Object invokeSingle(final String nsId, RemoteMethod method)
+      throws IOException {
+    UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+    acquirePermit(nsId, ugi, method, controller);
+    asyncTry(() -> {
+      boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
+      List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
+      RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
+      Class<?> proto = method.getProtocol();
+      Method m = method.getMethod();
+      Object[] params = method.getParams(loc);
+      invokeMethod(ugi, nns, isObserverRead, proto, m, params);
+    });
+    asyncFinally(o -> {
+      releasePermit(nsId, ugi, method, controller);
+      return o;
+    });
+    return null;
+  }
+
+  /**
+   * Invokes a single proxy call for a single location.
+   * <p>
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param location RemoteLocation to invoke.
+   * @param remoteMethod The remote method and parameters to invoke.
+   * @param clazz Class for the return type.
+   * @param <T> The type of the remote method return.
+   * @return The result of invoking the method if successful.
+   * @throws IOException If the invoke generated an error.
+   */
+  public <T> T invokeSingle(
+      final RemoteLocationContext location,
+      RemoteMethod remoteMethod, Class<T> clazz) throws IOException {
+    List<RemoteLocationContext> locations = Collections.singletonList(location);
+    invokeSequential(locations, remoteMethod);
+    return asyncReturn(clazz);
+  }
+}

+ 388 - 173
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -400,7 +400,7 @@ public class RouterRpcClient {
    *         NN + current user.
    *         NN + current user.
    * @throws IOException If we cannot get a connection to the NameNode.
    * @throws IOException If we cannot get a connection to the NameNode.
    */
    */
-  private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
+  protected ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
       String rpcAddress, Class<?> proto) throws IOException {
       String rpcAddress, Class<?> proto) throws IOException {
     ConnectionContext connection = null;
     ConnectionContext connection = null;
     try {
     try {
@@ -462,7 +462,7 @@ public class RouterRpcClient {
    * @return Retry decision.
    * @return Retry decision.
    * @throws IOException An IO Error occurred.
    * @throws IOException An IO Error occurred.
    */
    */
-  private RetryDecision shouldRetry(
+  protected RetryDecision shouldRetry(
       final IOException ioe, final int retryCount, final String nsId,
       final IOException ioe, final int retryCount, final String nsId,
       final FederationNamenodeContext namenode,
       final FederationNamenodeContext namenode,
       final boolean listObserverFirst) throws IOException {
       final boolean listObserverFirst) throws IOException {
@@ -526,11 +526,12 @@ public class RouterRpcClient {
     if (rpcMonitor != null) {
     if (rpcMonitor != null) {
       rpcMonitor.proxyOp();
       rpcMonitor.proxyOp();
     }
     }
-    boolean failover = false;
-    boolean shouldUseObserver = useObserver;
+
+    ExecutionStatus status = new ExecutionStatus(false, useObserver);
     Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
     Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
     for (FederationNamenodeContext namenode : namenodes) {
     for (FederationNamenodeContext namenode : namenodes) {
-      if (!shouldUseObserver && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
+      if (!status.isShouldUseObserver()
+          && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
         continue;
         continue;
       }
       }
       ConnectionContext connection = null;
       ConnectionContext connection = null;
@@ -541,83 +542,12 @@ public class RouterRpcClient {
         ProxyAndInfo<?> client = connection.getClient();
         ProxyAndInfo<?> client = connection.getClient();
         final Object proxy = client.getProxy();
         final Object proxy = client.getProxy();
 
 
-        ret = invoke(nsId, namenode, useObserver, 0, method, proxy, params);
-        if (failover &&
-            FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
-          // Success on alternate server, update
-          InetSocketAddress address = client.getAddress();
-          namenodeResolver.updateActiveNamenode(nsId, address);
-        }
-        if (this.rpcMonitor != null) {
-          this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
-        }
-        if (this.router.getRouterClientMetrics() != null) {
-          this.router.getRouterClientMetrics().incInvokedMethod(method);
-        }
+        ret = invoke(namenode, useObserver, 0, method, proxy, params);
+        postProcessResult(method, status, namenode, nsId, client);
         return ret;
         return ret;
       } catch (IOException ioe) {
       } catch (IOException ioe) {
         ioes.put(namenode, ioe);
         ioes.put(namenode, ioe);
-        if (ioe instanceof ObserverRetryOnActiveException) {
-          LOG.info("Encountered ObserverRetryOnActiveException from {}."
-                  + " Retry active namenode directly.", namenode);
-          shouldUseObserver = false;
-        } else if (ioe instanceof StandbyException) {
-          // Fail over indicated by retry policy and/or NN
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpFailureStandby(nsId);
-          }
-          failover = true;
-        } else if (isUnavailableException(ioe)) {
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpFailureCommunicate(nsId);
-          }
-          if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) {
-            namenodeResolver.updateUnavailableNamenode(nsId,
-                NetUtils.createSocketAddr(namenode.getRpcAddress()));
-          } else {
-            failover = true;
-          }
-        } else if (ioe instanceof RemoteException) {
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
-          }
-          RemoteException re = (RemoteException) ioe;
-          ioe = re.unwrapRemoteException();
-          ioe = getCleanException(ioe);
-          // RemoteException returned by NN
-          throw ioe;
-        } else if (ioe instanceof ConnectionNullException) {
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpFailureCommunicate(nsId);
-          }
-          LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress,
-              ioe.getMessage());
-          // Throw StandbyException so that client can retry
-          StandbyException se = new StandbyException(ioe.getMessage());
-          se.initCause(ioe);
-          throw se;
-        } else if (ioe instanceof NoNamenodesAvailableException) {
-          IOException cause = (IOException) ioe.getCause();
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpNoNamenodes(nsId);
-          }
-          LOG.error("Cannot get available namenode for {} {} error: {}",
-              nsId, rpcAddress, ioe.getMessage());
-          // Rotate cache so that client can retry the next namenode in the cache
-          if (shouldRotateCache(cause)) {
-            this.namenodeResolver.rotateCache(nsId, namenode, useObserver);
-          }
-          // Throw RetriableException so that client can retry
-          throw new RetriableException(ioe);
-        } else {
-          // Other communication error, this is a failure
-          // Communication retries are handled by the retry policy
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpFailureCommunicate(nsId);
-            this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState());
-          }
-          throw ioe;
-        }
+        handleInvokeMethodIOException(namenode, ioe, status, useObserver);
       } finally {
       } finally {
         if (connection != null) {
         if (connection != null) {
           connection.release();
           connection.release();
@@ -628,6 +558,24 @@ public class RouterRpcClient {
       this.rpcMonitor.proxyOpComplete(false, null, null);
       this.rpcMonitor.proxyOpComplete(false, null, null);
     }
     }
 
 
+    return handlerAllNamenodeFail(namenodes, method, ioes, params);
+  }
+
+  /**
+   * All namenodes cannot successfully process the RPC request,
+   * throw corresponding exceptions according to the exception type of each namenode.
+   *
+   * @param namenodes A prioritized list of namenodes within the same nameservice.
+   * @param method Remote ClientProtocol method to invoke.
+   * @param ioes The exception type of each namenode.
+   * @param params Variable list of parameters matching the method.
+   * @return null
+   * @throws IOException Corresponding IOException according to the
+   *                     exception type of each namenode.
+   */
+  protected Object handlerAllNamenodeFail(
+      List<? extends FederationNamenodeContext> namenodes, Method method,
+      Map<FederationNamenodeContext, IOException> ioes, Object[] params) throws IOException {
     // All namenodes were unavailable or in standby
     // All namenodes were unavailable or in standby
     String msg = "No namenode available to invoke " + method.getName() + " " +
     String msg = "No namenode available to invoke " + method.getName() + " " +
         Arrays.deepToString(params) + " in " + namenodes + " from " +
         Arrays.deepToString(params) + " in " + namenodes + " from " +
@@ -658,14 +606,120 @@ public class RouterRpcClient {
     }
     }
   }
   }
 
 
+  /**
+   * The RPC request is successfully processed by the NameNode, the NameNode status
+   * in the router cache is updated according to the ExecutionStatus.
+   *
+   * @param method Remote method to invoke.
+   * @param status Current execution status.
+   * @param namenode The namenode that successfully processed this RPC request.
+   * @param nsId Nameservice ID.
+   * @param client Connection client.
+   * @throws IOException If the state store cannot be accessed.
+   */
+  protected void postProcessResult(Method method, ExecutionStatus status,
+      FederationNamenodeContext namenode, String nsId, ProxyAndInfo<?> client) throws IOException {
+    if (status.isFailOver() &&
+        FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
+      // Success on alternate server, update
+      InetSocketAddress address = client.getAddress();
+      namenodeResolver.updateActiveNamenode(nsId, address);
+    }
+    if (this.rpcMonitor != null) {
+      this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
+    }
+    if (this.router.getRouterClientMetrics() != null) {
+      this.router.getRouterClientMetrics().incInvokedMethod(method);
+    }
+  }
+
+  /**
+   * The RPC request to the NameNode throws an exception,
+   * handle it according to the type of exception.
+   *
+   * @param namenode The namenode that processed this RPC request.
+   * @param ioe The exception thrown by this RPC request.
+   * @param status The current execution status.
+   * @param useObserver Whether to use observer namenodes.
+   * @throws IOException If it cannot invoke the method.
+   */
+  protected void handleInvokeMethodIOException(final FederationNamenodeContext namenode,
+      IOException ioe, final ExecutionStatus status, boolean useObserver) throws IOException {
+    String nsId = namenode.getNameserviceId();
+    String rpcAddress = namenode.getRpcAddress();
+    if (ioe instanceof ObserverRetryOnActiveException) {
+      LOG.info("Encountered ObserverRetryOnActiveException from {}."
+          + " Retry active namenode directly.", namenode);
+      status.setShouldUseObserver(false);
+    } else if (ioe instanceof StandbyException) {
+      // Fail over indicated by retry policy and/or NN
+      if (this.rpcMonitor != null) {
+        this.rpcMonitor.proxyOpFailureStandby(nsId);
+      }
+      status.setFailOver(true);
+    } else if (isUnavailableException(ioe)) {
+      if (this.rpcMonitor != null) {
+        this.rpcMonitor.proxyOpFailureCommunicate(nsId);
+      }
+      if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) {
+        namenodeResolver.updateUnavailableNamenode(nsId,
+            NetUtils.createSocketAddr(namenode.getRpcAddress()));
+      } else {
+        status.setFailOver(true);
+      }
+    } else if (ioe instanceof RemoteException) {
+      if (this.rpcMonitor != null) {
+        this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
+      }
+      RemoteException re = (RemoteException) ioe;
+      ioe = re.unwrapRemoteException();
+      ioe = getCleanException(ioe);
+      // RemoteException returned by NN
+      throw ioe;
+    } else if (ioe instanceof ConnectionNullException) {
+      if (this.rpcMonitor != null) {
+        this.rpcMonitor.proxyOpFailureCommunicate(nsId);
+      }
+      LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress,
+          ioe.getMessage());
+      // Throw StandbyException so that client can retry
+      StandbyException se = new StandbyException(ioe.getMessage());
+      se.initCause(ioe);
+      throw se;
+    } else if (ioe instanceof NoNamenodesAvailableException) {
+      IOException cause = (IOException) ioe.getCause();
+      if (this.rpcMonitor != null) {
+        this.rpcMonitor.proxyOpNoNamenodes(nsId);
+      }
+      LOG.error("Cannot get available namenode for {} {} error: {}",
+          nsId, rpcAddress, ioe.getMessage());
+      // Rotate cache so that client can retry the next namenode in the cache
+      if (shouldRotateCache(cause)) {
+        this.namenodeResolver.rotateCache(nsId, namenode, useObserver);
+      }
+      // Throw RetriableException so that client can retry
+      throw new RetriableException(ioe);
+    } else {
+      // Other communication error, this is a failure
+      // Communication retries are handled by the retry policy
+      if (this.rpcMonitor != null) {
+        this.rpcMonitor.proxyOpFailureCommunicate(nsId);
+        this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState());
+      }
+      throw ioe;
+    }
+  }
+
   /**
   /**
    * For tracking some information about the actual client.
    * For tracking some information about the actual client.
    * It adds trace info "clientIp:ip", "clientPort:port",
    * It adds trace info "clientIp:ip", "clientPort:port",
    * "clientId:id", "clientCallId:callId" and "realUser:userName"
    * "clientId:id", "clientCallId:callId" and "realUser:userName"
    * in the caller context, removing the old values if they were
    * in the caller context, removing the old values if they were
    * already present.
    * already present.
+   *
+   * @param ugi User group information.
    */
    */
-  private void addClientInfoToCallerContext(UserGroupInformation ugi) {
+  protected void addClientInfoToCallerContext(UserGroupInformation ugi) {
     CallerContext ctx = CallerContext.getCurrent();
     CallerContext ctx = CallerContext.getCurrent();
     String origContext = ctx == null ? null : ctx.getContext();
     String origContext = ctx == null ? null : ctx.getContext();
     byte[] origSignature = ctx == null ? null : ctx.getSignature();
     byte[] origSignature = ctx == null ? null : ctx.getSignature();
@@ -706,7 +760,8 @@ public class RouterRpcClient {
    * Re-throws exceptions generated by the remote RPC call as either
    * Re-throws exceptions generated by the remote RPC call as either
    * RemoteException or IOException.
    * RemoteException or IOException.
    *
    *
-   * @param nsId Identifier for the namespace
+   * @param namenode namenode context.
+   * @param listObserverFirst Observer read case, observer NN will be ranked first.
    * @param retryCount Current retry times
    * @param retryCount Current retry times
    * @param method Method to invoke
    * @param method Method to invoke
    * @param obj Target object for the method
    * @param obj Target object for the method
@@ -714,8 +769,8 @@ public class RouterRpcClient {
    * @return Response from the remote server
    * @return Response from the remote server
    * @throws IOException If error occurs.
    * @throws IOException If error occurs.
    */
    */
-  private Object invoke(
-      String nsId, FederationNamenodeContext namenode, Boolean listObserverFirst,
+  protected Object invoke(
+      FederationNamenodeContext namenode, Boolean listObserverFirst,
       int retryCount, final Method method,
       int retryCount, final Method method,
       final Object obj, final Object... params) throws IOException {
       final Object obj, final Object... params) throws IOException {
     try {
     try {
@@ -725,36 +780,58 @@ public class RouterRpcClient {
       return null;
       return null;
     } catch (InvocationTargetException e) {
     } catch (InvocationTargetException e) {
       Throwable cause = e.getCause();
       Throwable cause = e.getCause();
-      if (cause instanceof IOException) {
-        IOException ioe = (IOException) cause;
-
-        // Check if we should retry.
-        RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst);
-        if (decision == RetryDecision.RETRY) {
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpRetries();
-          }
+      return handlerInvokeException(namenode, listObserverFirst,
+          retryCount, method, obj, cause, params);
+    }
+  }
 
 
-          // retry
-          return invoke(nsId, namenode, listObserverFirst, ++retryCount, method, obj, params);
-        } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
-          // failover, invoker looks for standby exceptions for failover.
-          if (ioe instanceof StandbyException) {
-            throw ioe;
-          } else if (isUnavailableException(ioe)) {
-            throw ioe;
-          } else {
-            throw new StandbyException(ioe.getMessage());
-          }
-        } else {
+  /**
+   * Handle the exception when an RPC request to the NameNode throws an exception.
+   *
+   * @param namenode namenode context.
+   * @param listObserverFirst Observer read case, observer NN will be ranked first.
+   * @param retryCount Current retry times
+   * @param method Method to invoke
+   * @param obj Target object for the method
+   * @param e The exception thrown by the current invocation.
+   * @param params Variable parameters
+   * @return Response from the remote server
+   * @throws IOException If error occurs.
+   */
+  protected Object handlerInvokeException(FederationNamenodeContext namenode,
+      Boolean listObserverFirst, int retryCount, Method method, Object obj,
+      Throwable e, Object[] params) throws IOException {
+    String nsId = namenode.getNameserviceId();
+    if (e instanceof IOException) {
+      IOException ioe = (IOException) e;
+
+      // Check if we should retry.
+      RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst);
+      if (decision == RetryDecision.RETRY) {
+        if (this.rpcMonitor != null) {
+          this.rpcMonitor.proxyOpRetries();
+        }
+
+        // retry
+        return invoke(namenode, listObserverFirst, ++retryCount, method, obj, params);
+      } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
+        // failover, invoker looks for standby exceptions for failover.
+        if (ioe instanceof StandbyException) {
           throw ioe;
           throw ioe;
+        } else if (isUnavailableException(ioe)) {
+          throw ioe;
+        } else {
+          throw new StandbyException(ioe.getMessage());
         }
         }
       } else {
       } else {
-        throw new IOException(e);
+        throw ioe;
       }
       }
+    } else {
+      throw new IOException(e);
     }
     }
   }
   }
 
 
+
   /**
   /**
    * Check if the exception comes from an unavailable subcluster.
    * Check if the exception comes from an unavailable subcluster.
    * @param ioe IOException to check.
    * @param ioe IOException to check.
@@ -817,7 +894,7 @@ public class RouterRpcClient {
    * @param ioe Exception to clean up.
    * @param ioe Exception to clean up.
    * @return Copy of the original exception with a clean message.
    * @return Copy of the original exception with a clean message.
    */
    */
-  private static IOException getCleanException(IOException ioe) {
+  protected static IOException getCleanException(IOException ioe) {
     IOException ret = null;
     IOException ret = null;
 
 
     String msg = ioe.getMessage();
     String msg = ioe.getMessage();
@@ -1185,7 +1262,7 @@ public class RouterRpcClient {
    * @param loc Location we are processing.
    * @param loc Location we are processing.
    * @return Exception processed for federation.
    * @return Exception processed for federation.
    */
    */
-  private IOException processException(
+  protected IOException processException(
       IOException ioe, RemoteLocationContext loc) {
       IOException ioe, RemoteLocationContext loc) {
 
 
     if (ioe instanceof RemoteException) {
     if (ioe instanceof RemoteException) {
@@ -1251,7 +1328,7 @@ public class RouterRpcClient {
    * @return True if the result is an instance of the required class or if the
    * @return True if the result is an instance of the required class or if the
    *         expected class is null.
    *         expected class is null.
    */
    */
-  private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) {
+  protected static boolean isExpectedClass(Class<?> expectedClass, Object clazz) {
     if (expectedClass == null) {
     if (expectedClass == null) {
       return true;
       return true;
     } else if (clazz == null) {
     } else if (clazz == null) {
@@ -1269,7 +1346,7 @@ public class RouterRpcClient {
    * @return True if the result is equals to the expected value or if the
    * @return True if the result is equals to the expected value or if the
    *         expected value is null.
    *         expected value is null.
    */
    */
-  private static boolean isExpectedValue(Object expectedValue, Object value) {
+  protected static boolean isExpectedValue(Object expectedValue, Object value) {
     if (expectedValue == null) {
     if (expectedValue == null) {
       return true;
       return true;
     } else if (value == null) {
     } else if (value == null) {
@@ -1414,7 +1491,26 @@ public class RouterRpcClient {
           throws IOException {
           throws IOException {
     final List<RemoteResult<T, R>> results = invokeConcurrent(
     final List<RemoteResult<T, R>> results = invokeConcurrent(
         locations, method, standby, timeOutMs, clazz);
         locations, method, standby, timeOutMs, clazz);
+    return postProcessResult(requireResponse, results);
+  }
 
 
+  /**
+   * Post-process the results returned by
+   * {@link RouterRpcClient#invokeConcurrent(Collection, RemoteMethod, boolean, long, Class)}.
+   *
+   * @param requireResponse If true an exception will be thrown if all calls do
+   *          not complete. If false exceptions are ignored and all data results
+   *          successfully received are returned.
+   * @param results Result of invoking the method per subcluster (list of results),
+   *                This includes the exception for each remote location.
+   * @return Result of invoking the method per subcluster: nsId to result.
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @throws IOException If requiredResponse=true and any of the calls throw an
+   *                     exception.
+   */
+  protected static <T extends RemoteLocationContext, R> Map<T, R> postProcessResult(
+      boolean requireResponse, List<RemoteResult<T, R>> results) throws IOException {
     // Go over the results and exceptions
     // Go over the results and exceptions
     final Map<T, R> ret = new TreeMap<>();
     final Map<T, R> ret = new TreeMap<>();
     final List<IOException> thrownExceptions = new ArrayList<>();
     final List<IOException> thrownExceptions = new ArrayList<>();
@@ -1480,27 +1576,10 @@ public class RouterRpcClient {
       throw new IOException("No remote locations available");
       throw new IOException("No remote locations available");
     } else if (locations.size() == 1 && timeOutMs <= 0) {
     } else if (locations.size() == 1 && timeOutMs <= 0) {
       // Shortcut, just one call
       // Shortcut, just one call
-      T location = locations.iterator().next();
-      String ns = location.getNameserviceId();
-      boolean isObserverRead = isObserverReadEligible(ns, m);
-      final List<? extends FederationNamenodeContext> namenodes =
-          getOrderedNamenodes(ns, isObserverRead);
-      RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
-      acquirePermit(ns, ugi, method, controller);
-      try {
-        Class<?> proto = method.getProtocol();
-        Object[] paramList = method.getParams(location);
-        R result = (R) invokeMethod(
-            ugi, namenodes, isObserverRead, proto, m, paramList);
-        RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result);
-        return Collections.singletonList(remoteResult);
-      } catch (IOException ioe) {
-        // Localize the exception
-        throw processException(ioe, location);
-      } finally {
-        releasePermit(ns, ugi, method, controller);
-      }
+      return invokeSingle(locations.iterator().next(), method);
     }
     }
+    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+    acquirePermit(CONCURRENT_NS, ugi, method, controller);
 
 
     List<T> orderedLocations = new ArrayList<>();
     List<T> orderedLocations = new ArrayList<>();
     List<Callable<Object>> callables = new ArrayList<>();
     List<Callable<Object>> callables = new ArrayList<>();
@@ -1551,8 +1630,29 @@ public class RouterRpcClient {
       this.router.getRouterClientMetrics().incInvokedConcurrent(m);
       this.router.getRouterClientMetrics().incInvokedConcurrent(m);
     }
     }
 
 
-    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
-    acquirePermit(CONCURRENT_NS, ugi, method, controller);
+    return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables);
+  }
+
+  /**
+   * Invokes multiple concurrent proxy calls to different clients. Returns an
+   * array of results.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @param method The remote method and parameters to invoke.
+   * @param timeOutMs Timeout for each individual call.
+   * @param controller Fairness manager to control handlers assigned per NS.
+   * @param orderedLocations List of remote locations to call concurrently.
+   * @param callables Invoke method for each NameNode.
+   * @return Result of invoking the method per subcluster (list of results),
+   *         This includes the exception for each remote location.
+   * @throws IOException If there are errors invoking the method.
+   */
+  protected  <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemoteResults(
+      RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
+      List<T> orderedLocations, List<Callable<Object>> callables) throws IOException {
+    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    final Method m = method.getMethod();
     try {
     try {
       List<Future<Object>> futures = null;
       List<Future<Object>> futures = null;
       if (timeOutMs > 0) {
       if (timeOutMs > 0) {
@@ -1561,42 +1661,7 @@ public class RouterRpcClient {
       } else {
       } else {
         futures = executorService.invokeAll(callables);
         futures = executorService.invokeAll(callables);
       }
       }
-      List<RemoteResult<T, R>> results = new ArrayList<>();
-      for (int i=0; i<futures.size(); i++) {
-        T location = orderedLocations.get(i);
-        try {
-          Future<Object> future = futures.get(i);
-          R result = (R) future.get();
-          results.add(new RemoteResult<>(location, result));
-        } catch (CancellationException ce) {
-          T loc = orderedLocations.get(i);
-          String msg = "Invocation to \"" + loc + "\" for \""
-              + method.getMethodName() + "\" timed out";
-          LOG.error(msg);
-          IOException ioe = new SubClusterTimeoutException(msg);
-          results.add(new RemoteResult<>(location, ioe));
-        } catch (ExecutionException ex) {
-          Throwable cause = ex.getCause();
-          LOG.debug("Cannot execute {} in {}: {}",
-              m.getName(), location, cause.getMessage());
-
-          // Convert into IOException if needed
-          IOException ioe = null;
-          if (cause instanceof IOException) {
-            ioe = (IOException) cause;
-          } else {
-            ioe = new IOException("Unhandled exception while proxying API " +
-                m.getName() + ": " + cause.getMessage(), cause);
-          }
-
-          // Store the exceptions
-          results.add(new RemoteResult<>(location, ioe));
-        }
-      }
-      if (rpcMonitor != null) {
-        rpcMonitor.proxyOpComplete(true, CONCURRENT, null);
-      }
-      return results;
+      return processFutures(method, m, orderedLocations, futures);
     } catch (RejectedExecutionException e) {
     } catch (RejectedExecutionException e) {
       if (rpcMonitor != null) {
       if (rpcMonitor != null) {
         rpcMonitor.proxyOpFailureClientOverloaded();
         rpcMonitor.proxyOpFailureClientOverloaded();
@@ -1616,6 +1681,99 @@ public class RouterRpcClient {
     }
     }
   }
   }
 
 
+  /**
+   * Handle all futures during the invokeConcurrent call process.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @param method The remote method and parameters to invoke.
+   * @param m The method to invoke.
+   * @param orderedLocations List of remote locations to call concurrently.
+   * @param futures all futures during the invokeConcurrent call process.
+   * @return Result of invoking the method per subcluster (list of results),
+   *         This includes the exception for each remote location.
+   * @throws InterruptedException if the current thread was interrupted while waiting.
+   */
+  protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> processFutures(
+      RemoteMethod method, Method m, final List<T> orderedLocations,
+      final List<Future<Object>> futures) throws InterruptedException{
+    List<RemoteResult<T, R>> results = new ArrayList<>();
+    for (int i = 0; i< futures.size(); i++) {
+      T location = orderedLocations.get(i);
+      try {
+        Future<Object> future = futures.get(i);
+        R result = (R) future.get();
+        results.add(new RemoteResult<>(location, result));
+      } catch (CancellationException ce) {
+        T loc = orderedLocations.get(i);
+        String msg = "Invocation to \"" + loc + "\" for \""
+            + method.getMethodName() + "\" timed out";
+        LOG.error(msg);
+        IOException ioe = new SubClusterTimeoutException(msg);
+        results.add(new RemoteResult<>(location, ioe));
+      } catch (ExecutionException ex) {
+        Throwable cause = ex.getCause();
+        LOG.debug("Cannot execute {} in {}: {}",
+            m.getName(), location, cause.getMessage());
+
+        // Convert into IOException if needed
+        IOException ioe = null;
+        if (cause instanceof IOException) {
+          ioe = (IOException) cause;
+        } else {
+          ioe = new IOException("Unhandled exception while proxying API " +
+              m.getName() + ": " + cause.getMessage(), cause);
+        }
+
+        // Store the exceptions
+        results.add(new RemoteResult<>(location, ioe));
+      }
+    }
+    if (rpcMonitor != null) {
+      rpcMonitor.proxyOpComplete(true, CONCURRENT, null);
+    }
+    return results;
+  }
+
+  /**
+   * Invokes a ClientProtocol method against the specified namespace.
+   * <p>
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @param location RemoteLocation to invoke.
+   * @param method The remote method and parameters to invoke.
+   * @return Result of invoking the method per subcluster (list of results),
+   *         This includes the exception for each remote location.
+   * @throws IOException If there are errors invoking the method.
+   */
+  public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingle(
+      T location, RemoteMethod method) throws IOException {
+    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    final Method m = method.getMethod();
+    String ns = location.getNameserviceId();
+    boolean isObserverRead = isObserverReadEligible(ns, m);
+    final List<? extends FederationNamenodeContext> namenodes =
+        getOrderedNamenodes(ns, isObserverRead);
+    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+    acquirePermit(ns, ugi, method, controller);
+    try {
+      Class<?> proto = method.getProtocol();
+      Object[] paramList = method.getParams(location);
+      R result = (R) invokeMethod(
+          ugi, namenodes, isObserverRead, proto, m, paramList);
+      RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result);
+      return Collections.singletonList(remoteResult);
+    } catch (IOException ioe) {
+      // Localize the exception
+      throw processException(ioe, location);
+    } finally {
+      releasePermit(ns, ugi, method, controller);
+    }
+  }
+
   /**
   /**
    * Transfer origin thread local context which is necessary to current
    * Transfer origin thread local context which is necessary to current
    * worker thread when invoking method concurrently by executor service.
    * worker thread when invoking method concurrently by executor service.
@@ -1624,7 +1782,7 @@ public class RouterRpcClient {
    * @param originContext origin CallerContext which should be transferred
    * @param originContext origin CallerContext which should be transferred
    *                      to server side.
    *                      to server side.
    */
    */
-  private void transferThreadLocalContext(
+  protected void transferThreadLocalContext(
       final Call originCall, final CallerContext originContext) {
       final Call originCall, final CallerContext originContext) {
     Server.getCurCall().set(originCall);
     Server.getCurCall().set(originCall);
     CallerContext.setCurrent(originContext);
     CallerContext.setCurrent(originContext);
@@ -1675,7 +1833,7 @@ public class RouterRpcClient {
    * @param controller fairness policy controller to acquire permit from
    * @param controller fairness policy controller to acquire permit from
    * @throws IOException If permit could not be acquired for the nsId.
    * @throws IOException If permit could not be acquired for the nsId.
    */
    */
-  private void acquirePermit(final String nsId, final UserGroupInformation ugi,
+  protected void acquirePermit(final String nsId, final UserGroupInformation ugi,
       final RemoteMethod m, RouterRpcFairnessPolicyController controller)
       final RemoteMethod m, RouterRpcFairnessPolicyController controller)
       throws IOException {
       throws IOException {
     if (controller != null) {
     if (controller != null) {
@@ -1708,7 +1866,7 @@ public class RouterRpcClient {
    * @param m Remote method that needs to be invoked.
    * @param m Remote method that needs to be invoked.
    * @param controller fairness policy controller to release permit from
    * @param controller fairness policy controller to release permit from
    */
    */
-  private void releasePermit(final String nsId, final UserGroupInformation ugi,
+  protected void releasePermit(final String nsId, final UserGroupInformation ugi,
       final RemoteMethod m, RouterRpcFairnessPolicyController controller) {
       final RemoteMethod m, RouterRpcFairnessPolicyController controller) {
     if (controller != null) {
     if (controller != null) {
       controller.releasePermit(nsId);
       controller.releasePermit(nsId);
@@ -1782,7 +1940,7 @@ public class RouterRpcClient {
    * @return A prioritized list of NNs to use for communication.
    * @return A prioritized list of NNs to use for communication.
    * @throws IOException If a NN cannot be located for the nameservice ID.
    * @throws IOException If a NN cannot be located for the nameservice ID.
    */
    */
-  private List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
+  protected List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
       boolean isObserverRead) throws IOException {
       boolean isObserverRead) throws IOException {
     final List<? extends FederationNamenodeContext> namenodes;
     final List<? extends FederationNamenodeContext> namenodes;
 
 
@@ -1802,7 +1960,7 @@ public class RouterRpcClient {
     return namenodes;
     return namenodes;
   }
   }
 
 
-  private boolean isObserverReadEligible(String nsId, Method method) {
+  protected boolean isObserverReadEligible(String nsId, Method method) {
     return isReadCall(method) && isNamespaceObserverReadEligible(nsId);
     return isReadCall(method) && isNamespaceObserverReadEligible(nsId);
   }
   }
 
 
@@ -1857,7 +2015,7 @@ public class RouterRpcClient {
    * {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception},
    * {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception},
    * otherwise false.
    * otherwise false.
    */
    */
-  private boolean shouldRotateCache(IOException ioe) {
+  protected boolean shouldRotateCache(IOException ioe) {
     if (isUnavailableException(ioe)) {
     if (isUnavailableException(ioe)) {
       return true;
       return true;
     }
     }
@@ -1868,4 +2026,61 @@ public class RouterRpcClient {
     }
     }
     return isUnavailableException(ioe);
     return isUnavailableException(ioe);
   }
   }
+
+
+  /**
+   * The {@link  ExecutionStatus} class is a utility class used to track the status of
+   * execution operations performed by the {@link  RouterRpcClient}.
+   * It encapsulates the state of an operation, including whether it has completed,
+   * if a failover to a different NameNode should be attempted, and if an observer
+   * NameNode should be used for the operation.
+   *
+   * <p>The status is represented by a flag that indicate the current state of
+   * the execution. The flag can be checked individually to determine how to
+   * proceed with the operation or to handle its results.
+   */
+  protected static class ExecutionStatus {
+
+    /** A byte field used to store the state flags. */
+    private byte flag;
+    private static final byte FAIL_OVER_BIT = 1;
+    private static final byte SHOULD_USE_OBSERVER_BIT = 2;
+    private static final byte COMPLETE_BIT = 4;
+
+    ExecutionStatus() {
+      this(false, false);
+    }
+
+    ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
+      this.flag = 0;
+      setFailOver(failOver);
+      setShouldUseObserver(shouldUseObserver);
+      setComplete(false);
+    }
+
+    private void setFailOver(boolean failOver) {
+      flag = (byte) (failOver ? (flag | FAIL_OVER_BIT) : (flag & ~FAIL_OVER_BIT));
+    }
+
+    private void setShouldUseObserver(boolean shouldUseObserver) {
+      flag = (byte) (shouldUseObserver ?
+          (flag | SHOULD_USE_OBSERVER_BIT) : (flag & ~SHOULD_USE_OBSERVER_BIT));
+    }
+
+    void setComplete(boolean complete) {
+      flag = (byte) (complete ? (flag | COMPLETE_BIT) : (flag & ~COMPLETE_BIT));
+    }
+
+    boolean isFailOver() {
+      return (flag & FAIL_OVER_BIT) != 0;
+    }
+
+    boolean isShouldUseObserver() {
+      return (flag &  SHOULD_USE_OBSERVER_BIT) != 0;
+    }
+
+    boolean isComplete() {
+      return (flag & COMPLETE_BIT) != 0;
+    }
+  }
 }
 }

+ 71 - 7
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -26,6 +26,12 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
@@ -50,15 +56,19 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
 import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
@@ -199,7 +209,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
 
 
   private static final Logger LOG =
   private static final Logger LOG =
       LoggerFactory.getLogger(RouterRpcServer.class);
       LoggerFactory.getLogger(RouterRpcServer.class);
-
+  private ExecutorService asyncRouterHandler;
+  private ExecutorService asyncRouterResponder;
 
 
   /** Configuration for the RPC server. */
   /** Configuration for the RPC server. */
   private Configuration conf;
   private Configuration conf;
@@ -256,6 +267,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   private RouterRenameOption routerRenameOption;
   private RouterRenameOption routerRenameOption;
   /** Schedule the router federation rename jobs. */
   /** Schedule the router federation rename jobs. */
   private BalanceProcedureScheduler fedRenameScheduler;
   private BalanceProcedureScheduler fedRenameScheduler;
+  private boolean enableAsync;
+
   /**
   /**
    * Construct a router RPC server.
    * Construct a router RPC server.
    *
    *
@@ -285,6 +298,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY,
     int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY,
         DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);
         DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);
 
 
+    this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC,
+        DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT);
+    LOG.info("Router enable async {}", this.enableAsync);
+    if (this.enableAsync) {
+      initAsyncThreadPool();
+    }
     // Override Hadoop Common IPC setting
     // Override Hadoop Common IPC setting
     int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY,
     int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY,
         DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT);
         DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT);
@@ -393,15 +412,17 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     }
     }
 
 
     // Create the client
     // Create the client
-    this.rpcClient = new RouterRpcClient(this.conf, this.router,
-        this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
-
-    // Initialize modules
-    this.quotaCall = new Quota(this.router, this);
+    if (this.enableAsync) {
+      this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
+          this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+    } else {
+      this.rpcClient = new RouterRpcClient(this.conf, this.router,
+          this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+    }
     this.nnProto = new RouterNamenodeProtocol(this);
     this.nnProto = new RouterNamenodeProtocol(this);
+    this.quotaCall = new Quota(this.router, this);
     this.clientProto = new RouterClientProtocol(conf, this);
     this.clientProto = new RouterClientProtocol(conf, this);
     this.routerProto = new RouterUserProtocol(this);
     this.routerProto = new RouterUserProtocol(this);
-
     long dnCacheExpire = conf.getTimeDuration(
     long dnCacheExpire = conf.getTimeDuration(
         DN_REPORT_CACHE_EXPIRE,
         DN_REPORT_CACHE_EXPIRE,
         DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
         DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
@@ -430,6 +451,27 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     initRouterFedRename();
     initRouterFedRename();
   }
   }
 
 
+  /**
+   * Init router async handlers and router async responders.
+   */
+  protected void initAsyncThreadPool() {
+    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
+        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
+    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
+        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
+    if (asyncRouterHandler == null) {
+      LOG.info("init router async handler count: {}", asyncHandlerCount);
+      asyncRouterHandler = Executors.newFixedThreadPool(
+          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+    }
+    if (asyncRouterResponder == null) {
+      LOG.info("init router async responder count: {}", asyncResponderCount);
+      asyncRouterResponder = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("router async responder "));
+    }
+    AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
+  }
+
   /**
   /**
    * Clear expired namespace in the shared RouterStateIdContext.
    * Clear expired namespace in the shared RouterStateIdContext.
    */
    */
@@ -2068,4 +2110,26 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
       return executorService.submit(() -> load(type));
       return executorService.submit(() -> load(type));
     }
     }
   }
   }
+
+  public boolean isAsync() {
+    return this.enableAsync;
+  }
+
+  public Executor getAsyncRouterHandler() {
+    return asyncRouterHandler;
+  }
+
+  private static class AsyncThreadFactory implements ThreadFactory {
+    private final String namePrefix;
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+    AsyncThreadFactory(String namePrefix) {
+      this.namePrefix = namePrefix;
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+      return new Thread(r, namePrefix + threadNumber.getAndIncrement());
+    }
+  }
 }
 }

+ 101 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.ipc.Server;
+
+/**
+ * The ThreadLocalContext class is designed to capture and transfer the context of a
+ * thread-local environment within the Hadoop Distributed File System (HDFS) federation
+ * router operations. This is particularly useful for preserving the state across
+ * asynchronous operations where the context needs to be maintained consistently.
+ *
+ * The context includes details such as the current call being processed by the server, the
+ * caller's context, and performance monitoring timestamps. By transferring this context,
+ * the class ensures that the operations performed on worker threads correctly reflect
+ * the state of the original calling thread.
+ *
+ * Here is a high-level overview of the main components captured by this context:
+ * <ul>
+ *     <li>{@link Server.Call} - Represents the current server call.</li>
+ *     <li>{@link CallerContext} - Stores information about the caller.</li>
+ *     <li>startOpTime - Time for an operation to be received in the Router.</li>
+ *     <li>proxyOpTime - Time for an operation to be sent to the Namenode.</li>
+ * </ul>
+ *
+ * This class is typically used in scenarios where asynchronous processing is involved, to
+ * ensure that the thread executing the asynchronous task has the correct context applied.
+ *
+ * @see Server
+ * @see CallerContext
+ * @see FederationRPCPerformanceMonitor
+ */
+public class ThreadLocalContext {
+
+  /** The current server call being processed. */
+  private final Server.Call call;
+  /** The caller context containing information about the caller. */
+  private final CallerContext context;
+  /** Time for an operation to be received in the Router. */
+  private final long startOpTime;
+  /** Time for an operation to be sent to the Namenode. */
+  private final long proxyOpTime;
+
+  /**
+   * Constructs a new {@link  ThreadLocalContext} instance, capturing the current
+   * thread-local context at the point of creation.
+   */
+  public ThreadLocalContext() {
+    this.call = Server.getCurCall().get();
+    this.context = CallerContext.getCurrent();
+    this.startOpTime = FederationRPCPerformanceMonitor.getStartOpTime();
+    this.proxyOpTime =  FederationRPCPerformanceMonitor.getProxyOpTime();
+  }
+
+  /**
+   * Transfers the captured context to the current thread. This method is used to apply
+   * the context to worker threads that are processing asynchronous tasks, ensuring
+   * that the task execution reflects the state of the original calling thread.
+   */
+  public void transfer() {
+    if (call != null) {
+      Server.getCurCall().set(call);
+    }
+    if (context != null) {
+      CallerContext.setCurrent(context);
+    }
+    if (startOpTime != -1L) {
+      FederationRPCPerformanceMonitor.setStartOpTime(startOpTime);
+    }
+    if (proxyOpTime != -1L) {
+      FederationRPCPerformanceMonitor.setProxyOpTime(proxyOpTime);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "ThreadLocalContext{" +
+        "call=" + call +
+        ", context=" + context +
+        ", startOpTime=" + startOpTime +
+        ", proxyOpTime=" + proxyOpTime +
+        '}';
+  }
+}

+ 22 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java

@@ -73,6 +73,7 @@ public final class AsyncUtil {
    *         <ul>
    *         <ul>
    *           <li>{@code false} if {@code clazz} is {@link Boolean},
    *           <li>{@code false} if {@code clazz} is {@link Boolean},
    *           <li>-1 if {@code clazz} is {@link Long},
    *           <li>-1 if {@code clazz} is {@link Long},
+   *           <li>-1 if {@code clazz} is {@link Integer},
    *           <li>{@code null} for any other type.
    *           <li>{@code null} for any other type.
    *         </ul>
    *         </ul>
    */
    */
@@ -80,11 +81,14 @@ public final class AsyncUtil {
     if (clazz == null) {
     if (clazz == null) {
       return null;
       return null;
     }
     }
-    if (clazz.equals(Boolean.class)) {
+    if (clazz.equals(Boolean.class)
+        || clazz.equals(boolean.class)) {
       return (R) BOOLEAN_RESULT;
       return (R) BOOLEAN_RESULT;
-    } else if (clazz.equals(Long.class)) {
+    } else if (clazz.equals(Long.class)
+        || clazz.equals(long.class)) {
       return (R) LONG_RESULT;
       return (R) LONG_RESULT;
-    } else if (clazz.equals(Integer.class)) {
+    } else if (clazz.equals(Integer.class)
+        || clazz.equals(int.class)) {
       return (R) INT_RESULT;
       return (R) INT_RESULT;
     }
     }
     return (R) NULL_RESULT;
     return (R) NULL_RESULT;
@@ -140,6 +144,12 @@ public final class AsyncUtil {
         CompletableFuture.completedFuture(value));
         CompletableFuture.completedFuture(value));
   }
   }
 
 
+  /**
+   * Completes the current asynchronous operation with the specified completableFuture.
+   *
+   * @param completableFuture The completableFuture to complete the future with.
+   * @param <R>    The type of the value to be completed.
+   */
   public static <R> void asyncCompleteWith(CompletableFuture<R> completableFuture) {
   public static <R> void asyncCompleteWith(CompletableFuture<R> completableFuture) {
     CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture);
     CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture);
   }
   }
@@ -384,4 +394,13 @@ public final class AsyncUtil {
         .handle((unused, throwable) -> then.apply(completableFutures));
         .handle((unused, throwable) -> then.apply(completableFutures));
     CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
     CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
   }
   }
+
+  /**
+   * Get the CompletableFuture object stored in the current thread's local variable.
+   *
+   * @return The completableFuture object.
+   */
+  public static CompletableFuture<Object> getCompletableFuture() {
+    return CUR_COMPLETABLE_FUTURE.get();
+  }
 }
 }

+ 24 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -48,6 +48,14 @@
     </description>
     </description>
   </property>
   </property>
 
 
+  <property>
+    <name>dfs.federation.router.rpc.async.enable</name>
+    <value>false</value>
+    <description>
+      If true, router will process the RPC request asynchronously.
+    </description>
+  </property>
+
   <property>
   <property>
     <name>dfs.federation.router.rpc-address</name>
     <name>dfs.federation.router.rpc-address</name>
     <value>0.0.0.0:8888</value>
     <value>0.0.0.0:8888</value>
@@ -101,6 +109,22 @@
     </description>
     </description>
   </property>
   </property>
 
 
+  <property>
+    <name>dfs.federation.router.rpc.async.handler.count</name>
+    <value>2</value>
+    <description>
+      The number of async handler for the router to handle RPC client requests.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.rpc.async.responder.count</name>
+    <value>10</value>
+    <description>
+      The number of async responder for the router to handle responses.
+    </description>
+  </property>
+
   <property>
   <property>
     <name>dfs.federation.router.connection.creator.queue-size</name>
     <name>dfs.federation.router.connection.creator.queue-size</name>
     <value>100</value>
     <value>100</value>

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java

@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
+import java.util.concurrent.ForkJoinPool;
 
 
 import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
 import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
@@ -53,6 +54,7 @@ public class TestAsyncRpcProtocolPBUtil {
 
 
   @Before
   @Before
   public void setUp() throws IOException {
   public void setUp() throws IOException {
+    AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class,
     RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class,
         ProtobufRpcEngine2.class);
         ProtobufRpcEngine2.class);

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java

@@ -52,6 +52,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES;
 import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES;
@@ -81,6 +82,7 @@ public class TestRouterClientSideTranslatorPB {
 
 
   @BeforeClass
   @BeforeClass
   public static void setUp() throws Exception {
   public static void setUp() throws Exception {
+    AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
     conf = new HdfsConfiguration();
     conf = new HdfsConfiguration();
     cluster = (new MiniDFSCluster.Builder(conf))
     cluster = (new MiniDFSCluster.Builder(conf))
         .numDataNodes(1).build();
         .numDataNodes(1).build();

+ 314 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java

@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncRpcClient}.
+ */
+public class TestRouterAsyncRpcClient {
+  private static Configuration routerConf;
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+  private static String ns0;
+  private static String ns1;
+
+  /** Random Router for this federated cluster. */
+  private MiniRouterDFSCluster.RouterContext router;
+  private FileSystem routerFs;
+  private RouterRpcServer routerRpcServer;
+  private RouterAsyncRpcClient asyncRpcClient;
+  private FederationRPCMetrics rpcMetrics;
+  private final String testFile = "/test.file";
+
+  /**
+   * Start a cluster using a router service that includes 2 namespaces,
+   * 6 namenodes and 6 datanodes.
+   */
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    cluster = new MiniRouterDFSCluster(true, 2, 3,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+    cluster.setNumDatanodesPerNameservice(3);
+
+    cluster.startCluster();
+
+    // Making one Namenode active per nameservice
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+        cluster.switchToObserver(ns, NAMENODES[2]);
+      }
+    }
+    // Start routers with only an RPC service
+    routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .build();
+
+    // Reduce the number of RPC clients threads to overload the Router easy
+    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(routerConf);
+    // Start routers with only an RPC service
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+    cluster.waitActiveNamespaces();
+    ns0 = cluster.getNameservices().get(0);
+    ns1 = cluster.getNameservices().get(1);
+  }
+
+  @AfterClass
+  public static void shutdownCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Initialize the mount table, create a RouterAsyncRpcClient object, and create test file.
+   */
+  @Before
+  public void setup() throws Exception {
+    // Create mock locations
+    installMockLocations();
+
+    router = cluster.getRandomRouter();
+    rpcMetrics = router.getRouter().getRpcServer().getRPCMetrics();
+    routerFs = router.getFileSystem();
+    routerRpcServer = router.getRouterRpcServer();
+    routerRpcServer.initAsyncThreadPool();
+
+    // Create a RouterAsyncRpcClient object
+    asyncRpcClient = new RouterAsyncRpcClient(
+        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+        routerRpcServer.getRPCMonitor(),
+        routerRpcServer.getRouterStateIdContext());
+
+    // Create a test file
+    FSDataOutputStream fsDataOutputStream = routerFs.create(
+        new Path(testFile), true);
+    fsDataOutputStream.write(new byte[1024]);
+    fsDataOutputStream.close();
+  }
+
+  @After
+  public void down() throws IOException {
+    // clear client context
+    CallerContext.setCurrent(null);
+    cluster.switchToActive(ns0, NAMENODES[0]);
+    asyncRpcClient.getNamenodeResolver().updateActiveNamenode(
+        ns0, NetUtils.createSocketAddr(cluster
+            .getNamenode(ns0, NAMENODES[0]).getRpcAddress()));
+    // Delete the test file
+    boolean delete = routerFs.delete(new Path(testFile));
+    assertTrue(delete);
+    if (routerFs != null) {
+      routerFs.close();
+    }
+  }
+
+  /**
+   * Test the functionality of the asynchronous invokeSingle method.
+   */
+  @Test
+  public void testInvokeSingle() throws Exception {
+    long proxyOps = rpcMetrics.getProxyOps();
+    long activeProxyOps = rpcMetrics.getActiveProxyOps();
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
+    asyncRpcClient.invokeSingle(ns0, method);
+    long id = syncReturn(Long.class);
+    assertTrue(id > 0);
+    assertEquals(proxyOps + 1, rpcMetrics.getProxyOps());
+    assertEquals(activeProxyOps + 1, rpcMetrics.getActiveProxyOps());
+    assertTrue(rpcMetrics.getProcessingAvg() > 0);
+    assertTrue(rpcMetrics.getProxyAvg() > 0);
+  }
+
+  /**
+   * Test the functionality of the asynchronous invokeAll and invokeConcurrent methods.
+   */
+  @Test
+  public void testInvokeAll() throws Exception {
+    long proxyOps = rpcMetrics.getProxyOps();
+    long activeProxyOps = rpcMetrics.getActiveProxyOps();
+    final List<RemoteLocation> locations =
+        routerRpcServer.getLocationsForPath("/multDes/dir", false);
+    RemoteMethod method = new RemoteMethod("mkdirs",
+        new Class<?>[] {String.class, FsPermission.class, boolean.class},
+        new RemoteParam(), new FsPermission(ALL, ALL, ALL), false);
+    asyncRpcClient.invokeAll(locations, method);
+    LambdaTestUtils.intercept(FileNotFoundException.class,
+        "Parent directory doesn't exist: /multDes",
+        () -> syncReturn(boolean.class));
+    assertEquals(proxyOps + 2, rpcMetrics.getProxyOps());
+    assertEquals(activeProxyOps + 2, rpcMetrics.getActiveProxyOps());
+
+    proxyOps = rpcMetrics.getProxyOps();
+    activeProxyOps = rpcMetrics.getActiveProxyOps();
+    method = new RemoteMethod("mkdirs",
+        new Class<?>[] {String.class, FsPermission.class, boolean.class},
+        new RemoteParam(), new FsPermission(ALL, ALL, ALL), true);
+    asyncRpcClient.invokeAll(locations, method);
+    Boolean success = syncReturn(Boolean.class);
+    assertTrue(success);
+    assertEquals(proxyOps + 2, rpcMetrics.getProxyOps());
+    assertEquals(activeProxyOps + 2, rpcMetrics.getActiveProxyOps());
+
+    FileStatus[] fileStatuses = routerFs.listStatus(new Path("/multDes"));
+    assertNotNull(fileStatuses);
+    assertTrue(rpcMetrics.getProcessingAvg() > 0);
+    assertTrue(rpcMetrics.getProxyAvg() > 0);
+  }
+
+  /**
+   * Test the functionality of the asynchronous invokeMethod method.
+   */
+  @Test
+  public void testInvokeMethod() throws Exception {
+    long proxyOps = rpcMetrics.getProxyOps();
+    long activeProxyOps = rpcMetrics.getActiveProxyOps();
+    RemoteMethod method = new RemoteMethod("getFileInfo",
+        new Class<?>[] {String.class}, new RemoteParam());
+    UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    Class<?> protocol = method.getProtocol();
+    Object[] params = new String[]{testFile};
+    List<? extends FederationNamenodeContext> namenodes =
+        asyncRpcClient.getOrderedNamenodes(ns0, false);
+    asyncRpcClient.invokeMethod(ugi, namenodes, false,
+        protocol, method.getMethod(), params);
+    FileStatus fileStatus = syncReturn(FileStatus.class);
+    assertEquals(1024, fileStatus.getLen());
+    assertEquals(proxyOps + 1, rpcMetrics.getProxyOps());
+    assertEquals(activeProxyOps + 1, rpcMetrics.getActiveProxyOps());
+
+    LambdaTestUtils.intercept(IOException.class,
+        "No namenodes to invoke",
+        () -> asyncRpcClient.invokeMethod(ugi, new ArrayList<>(), false,
+            protocol, method.getMethod(), params));
+
+    proxyOps = rpcMetrics.getProxyOps();
+    activeProxyOps = rpcMetrics.getActiveProxyOps();
+    asyncRpcClient.invokeMethod(ugi, namenodes.subList(1, 3), false,
+        protocol, method.getMethod(), params);
+    LambdaTestUtils.intercept(StandbyException.class,
+        "No namenode available to invoke getFileInfo",
+        () -> syncReturn(FileStatus.class));
+    assertEquals(proxyOps, rpcMetrics.getProxyOps());
+    assertEquals(activeProxyOps, rpcMetrics.getActiveProxyOps());
+
+    cluster.switchToStandby(ns0, NAMENODES[0]);
+    asyncRpcClient.getNamenodeResolver().updateUnavailableNamenode(
+        ns0, NetUtils.createSocketAddr(namenodes.get(0).getRpcAddress()));
+    asyncRpcClient.invokeMethod(ugi, namenodes, false,
+        protocol, method.getMethod(), params);
+    LambdaTestUtils.intercept(RetriableException.class,
+        "No namenodes available under nameservice ns0",
+        () -> syncReturn(FileStatus.class));
+    assertEquals(1, rpcMetrics.getProxyOpNoNamenodes());
+
+    asyncRpcClient.invokeMethod(ugi, namenodes, false,
+        null, method.getMethod(), params);
+    LambdaTestUtils.intercept(StandbyException.class,
+        "Cannot get a connection",
+        () -> syncReturn(FileStatus.class));
+    assertEquals(1, rpcMetrics.getProxyOpFailureCommunicate());
+  }
+
+  /**
+   * Test the functionality of the asynchronous invokeSequential method.
+   */
+  @Test
+  public void testInvokeSequential() throws Exception {
+    List<RemoteLocation> locations =
+        routerRpcServer.getLocationsForPath(testFile, false, false);
+    RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations",
+        new Class<?>[] {String.class, long.class, long.class},
+        new RemoteParam(), 0, 1024);
+    asyncRpcClient.invokeSequential(locations, remoteMethod,
+        LocatedBlocks.class, null);
+    LocatedBlocks locatedBlocks = syncReturn(LocatedBlocks.class);
+    assertEquals(1024, locatedBlocks.getFileLength());
+    assertEquals(1, locatedBlocks.getLocatedBlocks().size());
+  }
+
+  /**
+   * Initialize the mount information.
+   */
+  private void installMockLocations() {
+    List<MiniRouterDFSCluster.RouterContext> routers = cluster.getRouters();
+
+    for (MiniRouterDFSCluster.RouterContext rc : routers) {
+      Router r = rc.getRouter();
+      MockResolver resolver = (MockResolver) r.getSubclusterResolver();
+      resolver.addLocation("/", ns0, "/");
+      resolver.addLocation("/multDes", ns0, "/multDes");
+      resolver.addLocation("/multDes", ns1, "/multDes");
+    }
+  }
+}