Browse Source

HDFS-17640.[ARR] RouterClientProtocol supports asynchronous rpc. (#7188)

Co-authored-by: Jian Zhang <keepromise@apache.org>
hfutatzhanghb 4 months ago
parent
commit
c48db62c19

+ 104 - 29
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -85,6 +85,10 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding;
+import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin;
+import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot;
+import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy;
 import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -166,7 +170,7 @@ public class RouterClientProtocol implements ClientProtocol {
   /** Router security manager to handle token operations. */
   private RouterSecurityManager securityManager = null;
 
-  RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
+  public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
     this.rpcServer = rpcServer;
     this.rpcClient = rpcServer.getRPCClient();
     this.subclusterResolver = rpcServer.getSubclusterResolver();
@@ -194,10 +198,17 @@ public class RouterClientProtocol implements ClientProtocol {
     this.superGroup = conf.get(
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
-    this.erasureCoding = new ErasureCoding(rpcServer);
-    this.storagePolicy = new RouterStoragePolicy(rpcServer);
-    this.snapshotProto = new RouterSnapshot(rpcServer);
-    this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
+    if (rpcServer.isAsync()) {
+      this.erasureCoding = new AsyncErasureCoding(rpcServer);
+      this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer);
+      this.snapshotProto = new RouterAsyncSnapshot(rpcServer);
+      this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer);
+    } else {
+      this.erasureCoding = new ErasureCoding(rpcServer);
+      this.storagePolicy = new RouterStoragePolicy(rpcServer);
+      this.snapshotProto = new RouterSnapshot(rpcServer);
+      this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
+    }
     this.securityManager = rpcServer.getRouterSecurityManager();
     this.rbfRename = new RouterFederationRename(rpcServer, conf);
     this.defaultNameServiceEnabled = conf.getBoolean(
@@ -347,7 +358,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @throws IOException If this path is not fault tolerant or the exception
    *                     should not be retried (e.g., NSQuotaExceededException).
    */
-  private List<RemoteLocation> checkFaultTolerantRetry(
+  protected List<RemoteLocation> checkFaultTolerantRetry(
       final RemoteMethod method, final String src, final IOException ioe,
       final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
           throws IOException {
@@ -820,7 +831,7 @@ public class RouterClientProtocol implements ClientProtocol {
   /**
    * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results.
    */
-  private static class GetListingComparator
+  protected static class GetListingComparator
       implements Comparator<byte[]>, Serializable {
     @Override
     public int compare(byte[] o1, byte[] o2) {
@@ -831,6 +842,10 @@ public class RouterClientProtocol implements ClientProtocol {
   private static GetListingComparator comparator =
       new GetListingComparator();
 
+  public static GetListingComparator getComparator() {
+    return comparator;
+  }
+
   @Override
   public DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) throws IOException {
@@ -1104,7 +1119,7 @@ public class RouterClientProtocol implements ClientProtocol {
     return mergeDtanodeStorageReport(dnSubcluster);
   }
 
-  private DatanodeStorageReport[] mergeDtanodeStorageReport(
+  protected DatanodeStorageReport[] mergeDtanodeStorageReport(
       Map<String, DatanodeStorageReport[]> dnSubcluster) {
     // Avoid repeating machines in multiple subclusters
     Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
@@ -1335,20 +1350,23 @@ public class RouterClientProtocol implements ClientProtocol {
   }
 
   /**
-   * Get all the locations of the path for {@link this#getContentSummary(String)}.
+   * Get all the locations of the path for {@link RouterClientProtocol#getContentSummary(String)}.
    * For example, there are some mount points:
-   *   /a -> ns0 -> /a
-   *   /a/b -> ns0 -> /a/b
-   *   /a/b/c -> ns1 -> /a/b/c
+   * <p>
+   *   /a - [ns0 - /a]
+   *   /a/b - [ns0 - /a/b]
+   *   /a/b/c - [ns1 - /a/b/c]
+   * </p>
    * When the path is '/a', the result of locations should be
    * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
    * When the path is '/b', will throw NoLocationException.
+   *
    * @param path the path to get content summary
    * @return one list contains all the remote location
-   * @throws IOException
+   * @throws IOException if an I/O error occurs
    */
   @VisibleForTesting
-  List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
+  protected List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
     // Try to get all the locations of the path.
     final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path);
     if (ns2Locations.isEmpty()) {
@@ -2039,7 +2057,7 @@ public class RouterClientProtocol implements ClientProtocol {
    *         replacement value.
    * @throws IOException If the dst paths could not be determined.
    */
-  private RemoteParam getRenameDestinations(
+  protected RemoteParam getRenameDestinations(
       final List<RemoteLocation> srcLocations,
       final List<RemoteLocation> dstLocations) throws IOException {
 
@@ -2087,7 +2105,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @param summaries Collection of individual summaries.
    * @return Aggregated content summary.
    */
-  private ContentSummary aggregateContentSummary(
+  protected ContentSummary aggregateContentSummary(
       Collection<ContentSummary> summaries) {
     if (summaries.size() == 1) {
       return summaries.iterator().next();
@@ -2142,7 +2160,7 @@ public class RouterClientProtocol implements ClientProtocol {
    *         everywhere.
    * @throws IOException If all the locations throw an exception.
    */
-  private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+  protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
       final RemoteMethod method) throws IOException {
     return getFileInfoAll(locations, method, -1);
   }
@@ -2157,7 +2175,7 @@ public class RouterClientProtocol implements ClientProtocol {
    *         everywhere.
    * @throws IOException If all the locations throw an exception.
    */
-  private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+  protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
       final RemoteMethod method, long timeOutMs) throws IOException {
 
     // Get the file info from everybody
@@ -2186,12 +2204,11 @@ public class RouterClientProtocol implements ClientProtocol {
 
   /**
    * Get the permissions for the parent of a child with given permissions.
-   * Add implicit u+wx permission for parent. This is based on
-   * @{FSDirMkdirOp#addImplicitUwx}.
+   * Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx.
    * @param mask The permission mask of the child.
    * @return The permission mask of the parent.
    */
-  private static FsPermission getParentPermission(final FsPermission mask) {
+  protected static FsPermission getParentPermission(final FsPermission mask) {
     FsPermission ret = new FsPermission(
         mask.getUserAction().or(FsAction.WRITE_EXECUTE),
         mask.getGroupAction(),
@@ -2208,7 +2225,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @return New HDFS file status representing a mount point.
    */
   @VisibleForTesting
-  HdfsFileStatus getMountPointStatus(
+  protected HdfsFileStatus getMountPointStatus(
       String name, int childrenNum, long date) {
     return getMountPointStatus(name, childrenNum, date, true);
   }
@@ -2223,7 +2240,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @return New HDFS file status representing a mount point.
    */
   @VisibleForTesting
-  HdfsFileStatus getMountPointStatus(
+  protected HdfsFileStatus getMountPointStatus(
       String name, int childrenNum, long date, boolean setPath) {
     long modTime = date;
     long accessTime = date;
@@ -2300,7 +2317,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @param path Name of the path to start checking dates from.
    * @return Map with the modification dates for all sub-entries.
    */
-  private Map<String, Long> getMountPointDates(String path) {
+  protected Map<String, Long> getMountPointDates(String path) {
     Map<String, Long> ret = new TreeMap<>();
     if (subclusterResolver instanceof MountTableResolver) {
       try {
@@ -2361,9 +2378,15 @@ public class RouterClientProtocol implements ClientProtocol {
   }
 
   /**
-   * Get listing on remote locations.
+   * Get a partial listing of the indicated directory.
+   *
+   * @param src the directory name
+   * @param startAfter the name to start after
+   * @param needLocation if blockLocations need to be returned
+   * @return a partial listing starting after startAfter
+   * @throws IOException if other I/O error occurred
    */
-  private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
+  protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
       String src, byte[] startAfter, boolean needLocation) throws IOException {
     try {
       List<RemoteLocation> locations =
@@ -2400,9 +2423,9 @@ public class RouterClientProtocol implements ClientProtocol {
    * @param startAfter starting listing from client, used to define listing
    *                   start boundary
    * @param remainingEntries how many entries left from subcluster
-   * @return
+   * @return true if should add mount point, otherwise false;
    */
-  private static boolean shouldAddMountPoint(
+  protected static boolean shouldAddMountPoint(
       byte[] mountPoint, byte[] lastEntry, byte[] startAfter,
       int remainingEntries) {
     if (comparator.compare(mountPoint, startAfter) > 0 &&
@@ -2425,7 +2448,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @throws IOException if unable to get the file status.
    */
   @VisibleForTesting
-  boolean isMultiDestDirectory(String src) throws IOException {
+  protected boolean isMultiDestDirectory(String src) throws IOException {
     try {
       if (rpcServer.isPathAll(src)) {
         List<RemoteLocation> locations;
@@ -2449,4 +2472,56 @@ public class RouterClientProtocol implements ClientProtocol {
   public int getRouterFederationRenameCount() {
     return rbfRename.getRouterFederationRenameCount();
   }
+
+  public RouterRpcServer getRpcServer() {
+    return rpcServer;
+  }
+
+  public RouterRpcClient getRpcClient() {
+    return rpcClient;
+  }
+
+  public FileSubclusterResolver getSubclusterResolver() {
+    return subclusterResolver;
+  }
+
+  public ActiveNamenodeResolver getNamenodeResolver() {
+    return namenodeResolver;
+  }
+
+  public long getServerDefaultsLastUpdate() {
+    return serverDefaultsLastUpdate;
+  }
+
+  public long getServerDefaultsValidityPeriod() {
+    return serverDefaultsValidityPeriod;
+  }
+
+  public boolean isAllowPartialList() {
+    return allowPartialList;
+  }
+
+  public long getMountStatusTimeOut() {
+    return mountStatusTimeOut;
+  }
+
+  public String getSuperUser() {
+    return superUser;
+  }
+
+  public String getSuperGroup() {
+    return superGroup;
+  }
+
+  public RouterStoragePolicy getStoragePolicy() {
+    return storagePolicy;
+  }
+
+  public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) {
+    this.serverDefaultsLastUpdate = serverDefaultsLastUpdate;
+  }
+
+  public RouterFederationRename getRbfRename() {
+    return rbfRename;
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java

@@ -93,7 +93,7 @@ public class RouterFederationRename {
    * @throws IOException if rename fails.
    * @return true if rename succeeds.
    */
-  boolean routerFedRename(final String src, final String dst,
+  public boolean routerFedRename(final String src, final String dst,
       final List<RemoteLocation> srcLocations,
       final List<RemoteLocation> dstLocations) throws IOException {
     if (!rpcServer.isEnableRenameAcrossNamespace()) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -1969,7 +1969,7 @@ public class RouterRpcClient {
    * @param nsId namespaceID
    * @return whether the 'namespace' has observer reads enabled.
    */
-  boolean isNamespaceObserverReadEligible(String nsId) {
+  public boolean isNamespaceObserverReadEligible(String nsId) {
     return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
   }
 

+ 15 - 5
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -75,7 +75,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota;
+import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol;
 import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncUserProtocol;
 import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
 import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
 import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
@@ -288,6 +292,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @param fileResolver File resolver to resolve file paths to subclusters.
    * @throws IOException If the RPC server could not be created.
    */
+  @SuppressWarnings("checkstyle:MethodLength")
   public RouterRpcServer(Configuration conf, Router router,
       ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver)
           throws IOException {
@@ -424,14 +429,19 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     if (this.enableAsync) {
       this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
           this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+      this.clientProto = new RouterAsyncClientProtocol(conf, this);
+      this.nnProto = new RouterAsyncNamenodeProtocol(this);
+      this.routerProto = new RouterAsyncUserProtocol(this);
+      this.quotaCall = new AsyncQuota(this.router, this);
     } else {
       this.rpcClient = new RouterRpcClient(this.conf, this.router,
           this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+      this.clientProto = new RouterClientProtocol(conf, this);
+      this.nnProto = new RouterNamenodeProtocol(this);
+      this.routerProto = new RouterUserProtocol(this);
+      this.quotaCall = new Quota(this.router, this);
     }
-    this.nnProto = new RouterNamenodeProtocol(this);
-    this.quotaCall = new Quota(this.router, this);
-    this.clientProto = new RouterClientProtocol(conf, this);
-    this.routerProto = new RouterUserProtocol(this);
+
     long dnCacheExpire = conf.getTimeDuration(
         DN_REPORT_CACHE_EXPIRE,
         DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
@@ -2193,7 +2203,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @param path Path to check.
    * @return If a path should be in all subclusters.
    */
-  boolean isPathAll(final String path) {
+  public boolean isPathAll(final String path) {
     MountTable entry = getMountTable(path);
     return entry != null && entry.isAll();
   }

+ 1083 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java

@@ -0,0 +1,1083 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
+import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture;
+
+/**
+ * Module that implements all the async RPC calls in {@link ClientProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterAsyncClientProtocol extends RouterClientProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterAsyncClientProtocol.class.getName());
+
+  private final RouterRpcServer rpcServer;
+  private final RouterRpcClient rpcClient;
+  private final RouterFederationRename rbfRename;
+  private final FileSubclusterResolver subclusterResolver;
+  private final ActiveNamenodeResolver namenodeResolver;
+  /** If it requires response from all subclusters. */
+  private final boolean allowPartialList;
+  /** Time out when getting the mount statistics. */
+  private long mountStatusTimeOut;
+  /** Identifier for the super user. */
+  private String superUser;
+  /** Identifier for the super group. */
+  private final String superGroup;
+  /**
+   * Caching server defaults so as to prevent redundant calls to namenode,
+   * similar to DFSClient, caching saves efforts when router connects
+   * to multiple clients.
+   */
+  private volatile FsServerDefaults serverDefaults;
+
+  public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
+    super(conf, rpcServer);
+    this.rpcServer = rpcServer;
+    this.rpcClient = rpcServer.getRPCClient();
+    this.rbfRename = getRbfRename();
+    this.subclusterResolver = getSubclusterResolver();
+    this.namenodeResolver = getNamenodeResolver();
+    this.allowPartialList = isAllowPartialList();
+    this.mountStatusTimeOut = getMountStatusTimeOut();
+    this.superUser = getSuperUser();
+    this.superGroup = getSuperGroup();
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+    long now = Time.monotonicNow();
+    if ((serverDefaults == null) || (now - getServerDefaultsLastUpdate()
+        > getServerDefaultsValidityPeriod())) {
+      RemoteMethod method = new RemoteMethod("getServerDefaults");
+      rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class);
+      asyncApply(o -> {
+        serverDefaults = (FsServerDefaults) o;
+        setServerDefaultsLastUpdate(now);
+        return serverDefaults;
+      });
+    } else {
+      asyncComplete(serverDefaults);
+    }
+    return asyncReturn(FsServerDefaults.class);
+  }
+
+  @Override
+  public HdfsFileStatus create(String src, FsPermission masked,
+      String clientName, EnumSetWritable<CreateFlag> flag,
+      boolean createParent, short replication, long blockSize,
+      CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
+      String storagePolicy) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    if (createParent && rpcServer.isPathAll(src)) {
+      int index = src.lastIndexOf(Path.SEPARATOR);
+      String parent = src.substring(0, index);
+      LOG.debug("Creating {} requires creating parent {}", src, parent);
+      FsPermission parentPermissions = getParentPermission(masked);
+      mkdirs(parent, parentPermissions, createParent);
+      asyncApply((ApplyFunction<Boolean, Boolean>) success -> {
+        if (!success) {
+          // This shouldn't happen as mkdirs returns true or exception
+          LOG.error("Couldn't create parents for {}", src);
+        }
+        return success;
+      });
+    }
+
+    RemoteMethod method = new RemoteMethod("create",
+        new Class<?>[] {String.class, FsPermission.class, String.class,
+            EnumSetWritable.class, boolean.class, short.class,
+            long.class, CryptoProtocolVersion[].class,
+            String.class, String.class},
+        new RemoteParam(), masked, clientName, flag, createParent,
+        replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(src, true);
+    final RemoteLocation[] createLocation = new RemoteLocation[1];
+    asyncTry(() -> {
+      rpcServer.getCreateLocationAsync(src, locations);
+      asyncApply((AsyncApplyFunction<RemoteLocation, Object>) remoteLocation -> {
+        createLocation[0] = remoteLocation;
+        rpcClient.invokeSingle(remoteLocation, method, HdfsFileStatus.class);
+        asyncApply((ApplyFunction<HdfsFileStatus, Object>) status -> {
+          status.setNamespace(remoteLocation.getNameserviceId());
+          return status;
+        });
+      });
+    });
+    asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
+      final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+          method, src, ioe, createLocation[0], locations);
+      rpcClient.invokeSequential(
+          newLocations, method, HdfsFileStatus.class, null);
+    }, IOException.class);
+
+    return asyncReturn(HdfsFileStatus.class);
+  }
+
+  @Override
+  public LastBlockWithStatus append(
+      String src, String clientName,
+      EnumSetWritable<CreateFlag> flag) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("append",
+        new Class<?>[] {String.class, String.class, EnumSetWritable.class},
+        new RemoteParam(), clientName, flag);
+    rpcClient.invokeSequential(method, locations, LastBlockWithStatus.class, null);
+    asyncApply((ApplyFunction<RemoteResult, LastBlockWithStatus>) result -> {
+      LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult();
+      lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId());
+      return lbws;
+    });
+    return asyncReturn(LastBlockWithStatus.class);
+  }
+
+  @Deprecated
+  @Override
+  public boolean rename(final String src, final String dst)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    final List<RemoteLocation> srcLocations =
+        rpcServer.getLocationsForPath(src, true, false);
+    final List<RemoteLocation> dstLocations =
+        rpcServer.getLocationsForPath(dst, false, false);
+    // srcLocations may be trimmed by getRenameDestinations()
+    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
+    if (locs.isEmpty()) {
+      asyncComplete(
+          rbfRename.routerFedRename(src, dst, srcLocations, dstLocations));
+      return asyncReturn(Boolean.class);
+    }
+    RemoteMethod method = new RemoteMethod("rename",
+        new Class<?>[] {String.class, String.class},
+        new RemoteParam(), dstParam);
+    isMultiDestDirectory(src);
+    asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> {
+      if (isMultiDestDirectory) {
+        if (locs.size() != srcLocations.size()) {
+          throw new IOException("Rename of " + src + " to " + dst + " is not"
+              + " allowed. The number of remote locations for both source and"
+              + " target should be same.");
+        }
+        rpcClient.invokeAll(locs, method);
+      } else {
+        rpcClient.invokeSequential(locs, method, Boolean.class,
+            Boolean.TRUE);
+      }
+    });
+    return asyncReturn(Boolean.class);
+  }
+
+  @Override
+  public void rename2(
+      final String src, final String dst,
+      final Options.Rename... options) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    final List<RemoteLocation> srcLocations =
+        rpcServer.getLocationsForPath(src, true, false);
+    final List<RemoteLocation> dstLocations =
+        rpcServer.getLocationsForPath(dst, false, false);
+    // srcLocations may be trimmed by getRenameDestinations()
+    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
+    if (locs.isEmpty()) {
+      rbfRename.routerFedRename(src, dst, srcLocations, dstLocations);
+      return;
+    }
+    RemoteMethod method = new RemoteMethod("rename2",
+        new Class<?>[] {String.class, String.class, options.getClass()},
+        new RemoteParam(), dstParam, options);
+    isMultiDestDirectory(src);
+    asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> {
+      if (isMultiDestDirectory) {
+        if (locs.size() != srcLocations.size()) {
+          throw new IOException("Rename of " + src + " to " + dst + " is not"
+              + " allowed. The number of remote locations for both source and"
+              + " target should be same.");
+        }
+        rpcClient.invokeConcurrent(locs, method);
+      } else {
+        rpcClient.invokeSequential(locs, method, null, null);
+      }
+    });
+  }
+
+  @Override
+  public void concat(String trg, String[] src) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    // Concat only effects when all files in the same namespace.
+    getFileRemoteLocation(trg);
+    asyncApply((AsyncApplyFunction<RemoteLocation, Object>) targetDestination -> {
+      if (targetDestination == null) {
+        throw new IOException("Cannot find target file - " + trg);
+      }
+      String targetNameService = targetDestination.getNameserviceId();
+      String[] sourceDestinations = new String[src.length];
+      int[] index = new int[1];
+      asyncForEach(Arrays.stream(src).iterator(), (forEachRun, sourceFile) -> {
+        getFileRemoteLocation(sourceFile);
+        asyncApply((ApplyFunction<RemoteLocation, Object>) srcLocation -> {
+          if (srcLocation == null) {
+            throw new IOException("Cannot find source file - " + sourceFile);
+          }
+          sourceDestinations[index[0]++] = srcLocation.getDest();
+          if (!targetNameService.equals(srcLocation.getNameserviceId())) {
+            throw new IOException("Cannot concatenate source file " + sourceFile
+                + " because it is located in a different namespace" + " with nameservice "
+                + srcLocation.getNameserviceId() + " from the target file with nameservice "
+                + targetNameService);
+          }
+          return null;
+        });
+      });
+      asyncApply((AsyncApplyFunction<Object, Object>) o -> {
+        // Invoke
+        RemoteMethod method = new RemoteMethod("concat",
+            new Class<?>[] {String.class, String[].class},
+            targetDestination.getDest(), sourceDestinations);
+        rpcClient.invokeSingle(targetDestination, method, Void.class);
+      });
+    });
+  }
+
+  @Override
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("mkdirs",
+        new Class<?>[] {String.class, FsPermission.class, boolean.class},
+        new RemoteParam(), masked, createParent);
+
+    // Create in all locations
+    if (rpcServer.isPathAll(src)) {
+      return rpcClient.invokeAll(locations, method);
+    }
+
+    asyncComplete(false);
+    if (locations.size() > 1) {
+      // Check if this directory already exists
+      asyncTry(() -> {
+        getFileInfo(src);
+        asyncApply((ApplyFunction<HdfsFileStatus, Boolean>) fileStatus -> {
+          if (fileStatus != null) {
+            // When existing, the NN doesn't return an exception; return true
+            return true;
+          }
+          return false;
+        });
+      });
+      asyncCatch((ret, ex) -> {
+        // Can't query if this file exists or not.
+        LOG.error("Error getting file info for {} while proxying mkdirs: {}",
+            src, ex.getMessage());
+        return false;
+      }, IOException.class);
+    }
+
+    final RemoteLocation firstLocation = locations.get(0);
+    asyncApply((AsyncApplyFunction<Boolean, Boolean>) success -> {
+      if (success) {
+        asyncComplete(true);
+        return;
+      }
+      asyncTry(() -> {
+        rpcClient.invokeSingle(firstLocation, method, Boolean.class);
+      });
+
+      asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
+        final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+            method, src, ioe, firstLocation, locations);
+        rpcClient.invokeSequential(
+            newLocations, method, Boolean.class, Boolean.TRUE);
+      }, IOException.class);
+    });
+
+    return asyncReturn(Boolean.class);
+  }
+
+  @Override
+  public DirectoryListing getListing(
+      String src, byte[] startAfter, boolean needLocation) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+    GetListingComparator comparator = RouterClientProtocol.getComparator();
+    getListingInt(src, startAfter, needLocation);
+    asyncApply((AsyncApplyFunction<List<RemoteResult<RemoteLocation, DirectoryListing>>, Object>)
+        listings -> {
+        TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator);
+        int totalRemainingEntries = 0;
+        final int[] remainingEntries = {0};
+        boolean namenodeListingExists = false;
+        // Check the subcluster listing with the smallest name to make sure
+        // no file is skipped across subclusters
+        byte[] lastName = null;
+        if (listings != null) {
+          for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
+            if (result.hasException()) {
+              IOException ioe = result.getException();
+              if (ioe instanceof FileNotFoundException) {
+                RemoteLocation location = result.getLocation();
+                LOG.debug("Cannot get listing from {}", location);
+              } else if (!allowPartialList) {
+                throw ioe;
+              }
+            } else if (result.getResult() != null) {
+              DirectoryListing listing = result.getResult();
+              totalRemainingEntries += listing.getRemainingEntries();
+              HdfsFileStatus[] partialListing = listing.getPartialListing();
+              int length = partialListing.length;
+              if (length > 0) {
+                HdfsFileStatus lastLocalEntry = partialListing[length-1];
+                byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes();
+                if (lastName == null ||
+                    comparator.compare(lastName, lastLocalName) > 0) {
+                  lastName = lastLocalName;
+                }
+              }
+            }
+          }
+
+          // Add existing entries
+          for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
+            DirectoryListing listing = result.getResult();
+            if (listing != null) {
+              namenodeListingExists = true;
+              for (HdfsFileStatus file : listing.getPartialListing()) {
+                byte[] filename = file.getLocalNameInBytes();
+                if (totalRemainingEntries > 0 &&
+                    comparator.compare(filename, lastName) > 0) {
+                  // Discarding entries further than the lastName
+                  remainingEntries[0]++;
+                } else {
+                  nnListing.put(filename, file);
+                }
+              }
+              remainingEntries[0] += listing.getRemainingEntries();
+            }
+          }
+        }
+
+        // Add mount points at this level in the tree
+        final List<String> children = subclusterResolver.getMountPoints(src);
+        if (children != null) {
+          // Get the dates for each mount point
+          Map<String, Long> dates = getMountPointDates(src);
+          byte[] finalLastName = lastName;
+          asyncForEach(children.iterator(), (forEachRun, child) -> {
+            long date = 0;
+            if (dates != null && dates.containsKey(child)) {
+              date = dates.get(child);
+            }
+            Path childPath = new Path(src, child);
+            getMountPointStatus(childPath.toString(), 0, date);
+            asyncApply((ApplyFunction<HdfsFileStatus, Object>) dirStatus -> {
+              // if there is no subcluster path, always add mount point
+              byte[] bChild = DFSUtil.string2Bytes(child);
+              if (finalLastName == null) {
+                nnListing.put(bChild, dirStatus);
+              } else {
+                if (shouldAddMountPoint(bChild,
+                    finalLastName, startAfter, remainingEntries[0])) {
+                  // This may overwrite existing listing entries with the mount point
+                  // TODO don't add if already there?
+                  nnListing.put(bChild, dirStatus);
+                }
+              }
+              return null;
+            });
+          });
+          asyncApply(o -> {
+            // Update the remaining count to include left mount points
+            if (nnListing.size() > 0) {
+              byte[] lastListing = nnListing.lastKey();
+              for (int i = 0; i < children.size(); i++) {
+                byte[] bChild = DFSUtil.string2Bytes(children.get(i));
+                if (comparator.compare(bChild, lastListing) > 0) {
+                  remainingEntries[0] += (children.size() - i);
+                  break;
+                }
+              }
+            }
+            return null;
+          });
+        }
+        asyncComplete(namenodeListingExists);
+        asyncApply((ApplyFunction<Boolean, Object>) exists -> {
+          if (!exists && nnListing.size() == 0 && children == null) {
+            // NN returns a null object if the directory cannot be found and has no
+            // listing. If we didn't retrieve any NN listing data, and there are no
+            // mount points here, return null.
+            return null;
+          }
+
+          // Generate combined listing
+          HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()];
+          combinedData = nnListing.values().toArray(combinedData);
+          return new DirectoryListing(combinedData, remainingEntries[0]);
+        });
+      });
+    return asyncReturn(DirectoryListing.class);
+  }
+
+  /**
+   * Get listing on remote locations.
+   */
+  @Override
+  protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
+      String src, byte[] startAfter, boolean needLocation) throws IOException {
+    List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(src, false, false);
+    // Locate the dir and fetch the listing.
+    if (locations.isEmpty()) {
+      asyncComplete(new ArrayList<>());
+      return asyncReturn(List.class);
+    }
+    asyncTry(() -> {
+      RemoteMethod method = new RemoteMethod("getListing",
+          new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
+          new RemoteParam(), startAfter, needLocation);
+      rpcClient.invokeConcurrent(locations, method, false, -1,
+          DirectoryListing.class);
+    });
+    asyncCatch((CatchFunction<List, RouterResolveException>) (o, e) -> {
+      LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage());
+      LOG.info("Cannot get locations for {}, {}.", src, e.getMessage());
+      return new ArrayList<>();
+    }, RouterResolveException.class);
+    return asyncReturn(List.class);
+  }
+
+
+  @Override
+  public HdfsFileStatus getFileInfo(String src) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    final IOException[] noLocationException = new IOException[1];
+    asyncTry(() -> {
+      final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false, false);
+      RemoteMethod method = new RemoteMethod("getFileInfo",
+          new Class<?>[] {String.class}, new RemoteParam());
+      // If it's a directory, we check in all locations
+      if (rpcServer.isPathAll(src)) {
+        getFileInfoAll(locations, method);
+      } else {
+        // Check for file information sequentially
+        rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, null);
+      }
+    });
+    asyncCatch((o, e) -> {
+      if (e instanceof NoLocationException
+          || e instanceof RouterResolveException) {
+        noLocationException[0] = e;
+      }
+      throw e;
+    }, IOException.class);
+
+    asyncApply((AsyncApplyFunction<HdfsFileStatus, Object>) ret -> {
+      // If there is no real path, check mount points
+      if (ret == null) {
+        List<String> children = subclusterResolver.getMountPoints(src);
+        if (children != null && !children.isEmpty()) {
+          Map<String, Long> dates = getMountPointDates(src);
+          long date = 0;
+          if (dates != null && dates.containsKey(src)) {
+            date = dates.get(src);
+          }
+          getMountPointStatus(src, children.size(), date, false);
+        } else if (children != null) {
+          // The src is a mount point, but there are no files or directories
+          getMountPointStatus(src, 0, 0, false);
+        } else {
+          asyncComplete(null);
+        }
+        asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) result -> {
+          // Can't find mount point for path and the path didn't contain any sub monit points,
+          // throw the NoLocationException to client.
+          if (result == null && noLocationException[0] != null) {
+            throw noLocationException[0];
+          }
+
+          return result;
+        });
+      } else {
+        asyncComplete(ret);
+      }
+    });
+
+    return asyncReturn(HdfsFileStatus.class);
+  }
+
+  @Override
+  public RemoteLocation getFileRemoteLocation(String path) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false);
+    if (locations.size() == 1) {
+      asyncComplete(locations.get(0));
+      return asyncReturn(RemoteLocation.class);
+    }
+
+    asyncForEach(locations.iterator(), (forEachRun, location) -> {
+      RemoteMethod method =
+          new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new RemoteParam());
+      rpcClient.invokeSequential(Collections.singletonList(location), method,
+          HdfsFileStatus.class, null);
+      asyncApply((ApplyFunction<HdfsFileStatus, RemoteLocation>) ret -> {
+        if (ret != null) {
+          forEachRun.breakNow();
+          return location;
+        }
+        return null;
+      });
+    });
+
+    return asyncReturn(RemoteLocation.class);
+  }
+
+  @Override
+  public HdfsFileStatus getMountPointStatus(
+      String name, int childrenNum, long date, boolean setPath) {
+    long modTime = date;
+    long accessTime = date;
+    final FsPermission[] permission = new FsPermission[]{FsPermission.getDirDefault()};
+    final String[] owner = new String[]{this.superUser};
+    final String[] group = new String[]{this.superGroup};
+    final int[] childrenNums = new int[]{childrenNum};
+    final EnumSet<HdfsFileStatus.Flags>[] flags =
+        new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)};
+    asyncComplete(null);
+    if (getSubclusterResolver() instanceof MountTableResolver) {
+      asyncTry(() -> {
+        String mName = name.startsWith("/") ? name : "/" + name;
+        MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
+        MountTable entry = mountTable.getMountPoint(mName);
+        if (entry != null) {
+          permission[0] = entry.getMode();
+          owner[0] = entry.getOwnerName();
+          group[0] = entry.getGroupName();
+
+          RemoteMethod method = new RemoteMethod("getFileInfo",
+              new Class<?>[] {String.class}, new RemoteParam());
+          getFileInfoAll(
+              entry.getDestinations(), method, mountStatusTimeOut);
+          asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) fInfo -> {
+            if (fInfo != null) {
+              permission[0] = fInfo.getPermission();
+              owner[0] = fInfo.getOwner();
+              group[0] = fInfo.getGroup();
+              childrenNums[0] = fInfo.getChildrenNum();
+              flags[0] = DFSUtil
+                  .getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(),
+                      fInfo.isSnapshotEnabled(), fInfo.hasAcl());
+            }
+            return fInfo;
+          });
+        }
+      });
+      asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> {
+        LOG.error("Cannot get mount point: {}", e.getMessage());
+        return status;
+      }, IOException.class);
+    } else {
+      try {
+        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+        owner[0] = ugi.getUserName();
+        group[0] = ugi.getPrimaryGroupName();
+      } catch (IOException e) {
+        String msg = "Cannot get remote user: " + e.getMessage();
+        if (UserGroupInformation.isSecurityEnabled()) {
+          LOG.error(msg);
+        } else {
+          LOG.debug(msg);
+        }
+      }
+    }
+    long inodeId = 0;
+    HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder();
+    asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) status -> {
+      if (setPath) {
+        Path path = new Path(name);
+        String nameStr = path.getName();
+        builder.path(DFSUtil.string2Bytes(nameStr));
+      }
+
+      return builder.isdir(true)
+          .mtime(modTime)
+          .atime(accessTime)
+          .perm(permission[0])
+          .owner(owner[0])
+          .group(group[0])
+          .symlink(new byte[0])
+          .fileId(inodeId)
+          .children(childrenNums[0])
+          .flags(flags[0])
+          .build();
+    });
+    return asyncReturn(HdfsFileStatus.class);
+  }
+
+  @Override
+  protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+      final RemoteMethod method, long timeOutMs) throws IOException {
+
+    asyncComplete(null);
+    // Get the file info from everybody
+    rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
+        HdfsFileStatus.class);
+    asyncApply(res -> {
+      Map<RemoteLocation, HdfsFileStatus> results = (Map<RemoteLocation, HdfsFileStatus>) res;
+      int children = 0;
+      // We return the first file
+      HdfsFileStatus dirStatus = null;
+      for (RemoteLocation loc : locations) {
+        HdfsFileStatus fileStatus = results.get(loc);
+        if (fileStatus != null) {
+          children += fileStatus.getChildrenNum();
+          if (!fileStatus.isDirectory()) {
+            return fileStatus;
+          } else if (dirStatus == null) {
+            dirStatus = fileStatus;
+          }
+        }
+      }
+      if (dirStatus != null) {
+        return updateMountPointStatus(dirStatus, children);
+      }
+      return null;
+    });
+    return asyncReturn(HdfsFileStatus.class);
+  }
+
+  @Override
+  public boolean recoverLease(String src, String clientName) throws IOException {
+    super.recoverLease(src, clientName);
+    return asyncReturn(boolean.class);
+  }
+
+  @Override
+  public long[] getStats() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("getStats");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
+    asyncApply(o -> {
+      Map<FederationNamespaceInfo, long[]> results
+          = (Map<FederationNamespaceInfo, long[]>) o;
+      long[] combinedData = new long[STATS_ARRAY_LENGTH];
+      for (long[] data : results.values()) {
+        for (int i = 0; i < combinedData.length && i < data.length; i++) {
+          if (data[i] >= 0) {
+            combinedData[i] += data[i];
+          }
+        }
+      }
+      return combinedData;
+    });
+    return asyncReturn(long[].class);
+  }
+
+  @Override
+  public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getReplicatedBlockStats");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true,
+        false, ReplicatedBlockStats.class);
+    asyncApply(o -> {
+      Map<FederationNamespaceInfo, ReplicatedBlockStats> ret =
+          (Map<FederationNamespaceInfo, ReplicatedBlockStats>) o;
+      return ReplicatedBlockStats.merge(ret.values());
+    });
+    return asyncReturn(ReplicatedBlockStats.class);
+  }
+
+  @Override
+  public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+    return rpcServer.getDatanodeReportAsync(type, true, 0);
+  }
+
+  @Override
+  public DatanodeStorageReport[] getDatanodeStorageReport(
+      HdfsConstants.DatanodeReportType type) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+    rpcServer.getDatanodeStorageReportMapAsync(type);
+    asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>)
+        dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster));
+    return asyncReturn(DatanodeStorageReport[].class);
+  }
+
+  public DatanodeStorageReport[] getDatanodeStorageReport(
+      HdfsConstants.DatanodeReportType type, boolean requireResponse, long timeOutMs)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+    rpcServer.getDatanodeStorageReportMapAsync(type, requireResponse, timeOutMs);
+    asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>)
+        dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster));
+    return asyncReturn(DatanodeStorageReport[].class);
+  }
+
+  @Override
+  public boolean setSafeMode(HdfsConstants.SafeModeAction action,
+                             boolean isChecked) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    // Set safe mode in all the name spaces
+    RemoteMethod method = new RemoteMethod("setSafeMode",
+        new Class<?>[] {HdfsConstants.SafeModeAction.class, boolean.class},
+        action, isChecked);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(
+        nss, method, true, !isChecked, Boolean.class);
+
+    asyncApply(o -> {
+      Map<FederationNamespaceInfo, Boolean> results
+          = (Map<FederationNamespaceInfo, Boolean>) o;
+      // We only report true if all the name space are in safe mode
+      int numSafemode = 0;
+      for (boolean safemode : results.values()) {
+        if (safemode) {
+          numSafemode++;
+        }
+      }
+      return numSafemode == results.size();
+    });
+    return asyncReturn(Boolean.class);
+  }
+
+  @Override
+  public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("saveNamespace",
+        new Class<?>[] {long.class, long.class}, timeWindow, txGap);
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true,
+        false, boolean.class);
+
+    asyncApply(o -> {
+      Map<FederationNamespaceInfo, Boolean> ret =
+          (Map<FederationNamespaceInfo, Boolean>) o;
+      boolean success = true;
+      for (boolean s : ret.values()) {
+        if (!s) {
+          success = false;
+          break;
+        }
+      }
+      return success;
+    });
+    return asyncReturn(Boolean.class);
+  }
+
+  @Override
+  public long rollEdits() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false, long.class);
+    asyncApply(o -> {
+      Map<FederationNamespaceInfo, Long> ret =
+          (Map<FederationNamespaceInfo, Long>) o;
+      // Return the maximum txid
+      long txid = 0;
+      for (long t : ret.values()) {
+        if (t > txid) {
+          txid = t;
+        }
+      }
+      return txid;
+    });
+    return asyncReturn(long.class);
+  }
+
+  @Override
+  public boolean restoreFailedStorage(String arg) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("restoreFailedStorage",
+        new Class<?>[] {String.class}, arg);
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
+    asyncApply(o -> {
+      Map<FederationNamespaceInfo, Boolean> ret =
+          (Map<FederationNamespaceInfo, Boolean>) o;
+      boolean success = true;
+      for (boolean s : ret.values()) {
+        if (!s) {
+          success = false;
+          break;
+        }
+      }
+      return success;
+    });
+    return asyncReturn(boolean.class);
+  }
+
+  @Override
+  public RollingUpgradeInfo rollingUpgrade(HdfsConstants.RollingUpgradeAction action)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("rollingUpgrade",
+        new Class<?>[] {HdfsConstants.RollingUpgradeAction.class}, action);
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+    rpcClient.invokeConcurrent(
+        nss, method, true, false, RollingUpgradeInfo.class);
+    asyncApply(o -> {
+      Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
+          (Map<FederationNamespaceInfo, RollingUpgradeInfo>) o;
+      // Return the first rolling upgrade info
+      RollingUpgradeInfo info = null;
+      for (RollingUpgradeInfo infoNs : ret.values()) {
+        if (info == null && infoNs != null) {
+          info = infoNs;
+        }
+      }
+      return info;
+    });
+    return asyncReturn(RollingUpgradeInfo.class);
+  }
+
+  @Override
+  public ContentSummary getContentSummary(String path) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    // Get the summaries from regular files
+    final Collection<ContentSummary> summaries = new ArrayList<>();
+    final List<RemoteLocation> locations = getLocationsForContentSummary(path);
+    final RemoteMethod method = new RemoteMethod("getContentSummary",
+        new Class<?>[] {String.class}, new RemoteParam());
+    rpcClient.invokeConcurrent(locations, method,
+        false, -1, ContentSummary.class);
+
+    asyncApply(o -> {
+      final List<RemoteResult<RemoteLocation, ContentSummary>> results =
+          (List<RemoteResult<RemoteLocation, ContentSummary>>) o;
+
+      FileNotFoundException notFoundException = null;
+      for (RemoteResult<RemoteLocation, ContentSummary> result : results) {
+        if (result.hasException()) {
+          IOException ioe = result.getException();
+          if (ioe instanceof FileNotFoundException) {
+            notFoundException = (FileNotFoundException)ioe;
+          } else if (!allowPartialList) {
+            throw ioe;
+          }
+        } else if (result.getResult() != null) {
+          summaries.add(result.getResult());
+        }
+      }
+
+      // Throw original exception if no original nor mount points
+      if (summaries.isEmpty() && notFoundException != null) {
+        throw notFoundException;
+      }
+      return aggregateContentSummary(summaries);
+    });
+
+    return asyncReturn(ContentSummary.class);
+  }
+
+  @Override
+  public long getCurrentEditLogTxid() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod(
+        "getCurrentEditLogTxid", new Class<?>[] {});
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false, long.class);
+
+    asyncApply(o -> {
+      Map<FederationNamespaceInfo, Long> ret =
+          (Map<FederationNamespaceInfo, Long>) o;
+      // Return the maximum txid
+      long txid = 0;
+      for (long t : ret.values()) {
+        if (t > txid) {
+          txid = t;
+        }
+      }
+      return txid;
+    });
+    return asyncReturn(long.class);
+  }
+
+  @Override
+  public void msync() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+    // Only msync to nameservices with observer reads enabled.
+    Set<FederationNamespaceInfo> allNamespaces = namenodeResolver.getNamespaces();
+    RemoteMethod method = new RemoteMethod("msync");
+    Set<FederationNamespaceInfo> namespacesEligibleForObserverReads = allNamespaces
+        .stream()
+        .filter(ns -> rpcClient.isNamespaceObserverReadEligible(ns.getNameserviceId()))
+        .collect(Collectors.toSet());
+    if (namespacesEligibleForObserverReads.isEmpty()) {
+      asyncCompleteWith(CompletableFuture.completedFuture(null));
+      return;
+    }
+    rpcClient.invokeConcurrent(namespacesEligibleForObserverReads, method);
+  }
+
+  @Override
+  public boolean setReplication(String src, short replication)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("setReplication",
+        new Class<?>[] {String.class, short.class}, new RemoteParam(),
+        replication);
+    if (rpcServer.isInvokeConcurrent(src)) {
+      rpcClient.invokeConcurrent(locations, method, Boolean.class);
+      asyncApply(o -> {
+        Map<RemoteLocation, Boolean> results = (Map<RemoteLocation, Boolean>) o;
+        return !results.containsValue(false);
+      });
+    } else {
+      rpcClient.invokeSequential(locations, method, Boolean.class,
+          Boolean.TRUE);
+    }
+    return asyncReturn(boolean.class);
+  }
+
+  /**
+   * Checks if the path is a directory and is supposed to be present in all
+   * subclusters.
+   * @param src the source path
+   * @return true if the path is directory and is supposed to be present in all
+   *         subclusters else false in all other scenarios.
+   * @throws IOException if unable to get the file status.
+   */
+  @Override
+  public boolean isMultiDestDirectory(String src) throws IOException {
+    try {
+      if (rpcServer.isPathAll(src)) {
+        List<RemoteLocation> locations;
+        locations = rpcServer.getLocationsForPath(src, false, false);
+        RemoteMethod method = new RemoteMethod("getFileInfo",
+            new Class<?>[] {String.class}, new RemoteParam());
+        rpcClient.invokeSequential(locations,
+            method, HdfsFileStatus.class, null);
+        CompletableFuture<Object> completableFuture = getCompletableFuture();
+        completableFuture = completableFuture.thenApply(o -> {
+          HdfsFileStatus fileStatus = (HdfsFileStatus) o;
+          if (fileStatus != null) {
+            return fileStatus.isDirectory();
+          } else {
+            LOG.debug("The destination {} doesn't exist.", src);
+          }
+          return false;
+        });
+        asyncCompleteWith(completableFuture);
+        return asyncReturn(Boolean.class);
+      }
+    } catch (UnresolvedPathException e) {
+      LOG.debug("The destination {} is a symlink.", src);
+    }
+    asyncCompleteWith(CompletableFuture.completedFuture(false));
+    return asyncReturn(Boolean.class);
+  }
+}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java

@@ -56,6 +56,7 @@ public class RouterAsyncProtocolTestBase {
   private FileSystem routerFs;
   private RouterRpcServer routerRpcServer;
   private RouterRpcServer routerAsyncRpcServer;
+  protected static final String TEST_DIR_PATH = "/testdir";
 
   @BeforeClass
   public static void setUpCluster() throws Exception {
@@ -114,19 +115,20 @@ public class RouterAsyncProtocolTestBase {
         routerRpcServer.getRouterStateIdContext());
     routerAsyncRpcServer = Mockito.spy(routerRpcServer);
     Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
+    Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true);
 
     // Create mock locations
     MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
     resolver.addLocation("/", ns0, "/");
     FsPermission permission = new FsPermission("705");
-    routerFs.mkdirs(new Path("/testdir"), permission);
+    routerFs.mkdirs(new Path(TEST_DIR_PATH), permission);
   }
 
   @After
   public void tearDown() throws IOException {
     // clear client context
     CallerContext.setCurrent(null);
-    boolean delete = routerFs.delete(new Path("/testdir"));
+    boolean delete = routerFs.delete(new Path(TEST_DIR_PATH));
     assertTrue(delete);
     if (routerFs != null) {
       routerFs.close();

+ 144 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java

@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.util.Lists;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+import static org.apache.hadoop.fs.permission.FsAction.NONE;
+import static org.apache.hadoop.fs.permission.FsAction.READ;
+import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncClientProtocol}.
+ */
+public class TestRouterAsyncClientProtocol extends RouterAsyncProtocolTestBase {
+  private RouterAsyncClientProtocol asyncClientProtocol;
+  private RouterClientProtocol clientProtocol;
+  private final String testPath = TEST_DIR_PATH + "/test";
+
+  @Before
+  public void setup() throws IOException {
+    asyncClientProtocol = new RouterAsyncClientProtocol(getRouterConf(), getRouterAsyncRpcServer());
+    clientProtocol = new RouterClientProtocol(getRouterConf(), getRouterRpcServer());
+  }
+
+  @Test
+  public void testGetServerDefaults() throws Exception {
+    FsServerDefaults serverDefaults = clientProtocol.getServerDefaults();
+    asyncClientProtocol.getServerDefaults();
+    FsServerDefaults fsServerDefaults = syncReturn(FsServerDefaults.class);
+    assertEquals(serverDefaults.getBlockSize(), fsServerDefaults.getBlockSize());
+    assertEquals(serverDefaults.getReplication(), fsServerDefaults.getReplication());
+    assertEquals(serverDefaults.getChecksumType(), fsServerDefaults.getChecksumType());
+    assertEquals(
+        serverDefaults.getDefaultStoragePolicyId(), fsServerDefaults.getDefaultStoragePolicyId());
+  }
+
+  @Test
+  public void testClientProtocolRpc() throws Exception {
+    asyncClientProtocol.mkdirs(testPath, new FsPermission(ALL, ALL, ALL), false);
+    Boolean success = syncReturn(Boolean.class);
+    assertTrue(success);
+
+    asyncClientProtocol.setPermission(testPath, new FsPermission(READ_WRITE, READ, NONE));
+    syncReturn(Void.class);
+
+    asyncClientProtocol.getFileInfo(testPath);
+    HdfsFileStatus hdfsFileStatus = syncReturn(HdfsFileStatus.class);
+    assertEquals(hdfsFileStatus.getPermission(), new FsPermission(READ_WRITE, READ, NONE));
+
+    List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "tmpUser", ALL));
+    asyncClientProtocol.setAcl(testPath, aclSpec);
+    syncReturn(Void.class);
+    asyncClientProtocol.setOwner(testPath, "tmpUser", "tmpUserGroup");
+    syncReturn(Void.class);
+
+    asyncClientProtocol.getFileInfo(testPath);
+    hdfsFileStatus = syncReturn(HdfsFileStatus.class);
+    assertEquals("tmpUser", hdfsFileStatus.getOwner());
+    assertEquals("tmpUserGroup", hdfsFileStatus.getGroup());
+
+    asyncClientProtocol.create(testPath + "/testCreate.file",
+        new FsPermission(ALL, ALL, ALL), "testAsyncClient",
+        new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)),
+        false, (short) 1, 128 * 1024 * 1024L,
+        new CryptoProtocolVersion[]{ENCRYPTION_ZONES},
+        null, null);
+    hdfsFileStatus = syncReturn(HdfsFileStatus.class);
+    assertTrue(hdfsFileStatus.isFile());
+    assertEquals(128 * 1024 * 1024, hdfsFileStatus.getBlockSize());
+
+    asyncClientProtocol.getFileRemoteLocation(testPath);
+    RemoteLocation remoteLocation = syncReturn(RemoteLocation.class);
+    assertNotNull(remoteLocation);
+    assertEquals(getNs0(), remoteLocation.getNameserviceId());
+    assertEquals(testPath, remoteLocation.getSrc());
+
+    asyncClientProtocol.getListing(testPath, new byte[1], true);
+    DirectoryListing directoryListing = syncReturn(DirectoryListing.class);
+    assertEquals(1, directoryListing.getPartialListing().length);
+
+    asyncClientProtocol.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
+    DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class);
+    assertEquals(3, datanodeInfos.length);
+
+    asyncClientProtocol.createSymlink(testPath + "/testCreate.file",
+        "/link/link.file", new FsPermission(ALL, ALL, ALL), true);
+    syncReturn(Void.class);
+
+    asyncClientProtocol.getFileLinkInfo("/link/link.file");
+    hdfsFileStatus = syncReturn(HdfsFileStatus.class);
+    assertEquals("testCreate.file", hdfsFileStatus.getSymlink().getName());
+
+    asyncClientProtocol.rename(testPath + "/testCreate.file",
+        testPath + "/testRename.file");
+    success = syncReturn(boolean.class);
+    assertTrue(success);
+
+    asyncClientProtocol.delete(testPath, true);
+    success = syncReturn(boolean.class);
+    assertTrue(success);
+  }
+}