浏览代码

Revert "HDFS-17640.[ARR] RouterClientProtocol supports asynchronous rpc. (#7188)"

This reverts commit c48db62c193fb54784a6aa66e40511437c2d5ff6.
Jian Zhang 4 月之前
父节点
当前提交
ddbddf3a0a

+ 29 - 104
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -85,10 +85,6 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
-import org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding;
-import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin;
-import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot;
-import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy;
 import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -170,7 +166,7 @@ public class RouterClientProtocol implements ClientProtocol {
   /** Router security manager to handle token operations. */
   private RouterSecurityManager securityManager = null;
 
-  public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
+  RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
     this.rpcServer = rpcServer;
     this.rpcClient = rpcServer.getRPCClient();
     this.subclusterResolver = rpcServer.getSubclusterResolver();
@@ -198,17 +194,10 @@ public class RouterClientProtocol implements ClientProtocol {
     this.superGroup = conf.get(
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
-    if (rpcServer.isAsync()) {
-      this.erasureCoding = new AsyncErasureCoding(rpcServer);
-      this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer);
-      this.snapshotProto = new RouterAsyncSnapshot(rpcServer);
-      this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer);
-    } else {
-      this.erasureCoding = new ErasureCoding(rpcServer);
-      this.storagePolicy = new RouterStoragePolicy(rpcServer);
-      this.snapshotProto = new RouterSnapshot(rpcServer);
-      this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
-    }
+    this.erasureCoding = new ErasureCoding(rpcServer);
+    this.storagePolicy = new RouterStoragePolicy(rpcServer);
+    this.snapshotProto = new RouterSnapshot(rpcServer);
+    this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
     this.securityManager = rpcServer.getRouterSecurityManager();
     this.rbfRename = new RouterFederationRename(rpcServer, conf);
     this.defaultNameServiceEnabled = conf.getBoolean(
@@ -358,7 +347,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @throws IOException If this path is not fault tolerant or the exception
    *                     should not be retried (e.g., NSQuotaExceededException).
    */
-  protected List<RemoteLocation> checkFaultTolerantRetry(
+  private List<RemoteLocation> checkFaultTolerantRetry(
       final RemoteMethod method, final String src, final IOException ioe,
       final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
           throws IOException {
@@ -831,7 +820,7 @@ public class RouterClientProtocol implements ClientProtocol {
   /**
    * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results.
    */
-  protected static class GetListingComparator
+  private static class GetListingComparator
       implements Comparator<byte[]>, Serializable {
     @Override
     public int compare(byte[] o1, byte[] o2) {
@@ -842,10 +831,6 @@ public class RouterClientProtocol implements ClientProtocol {
   private static GetListingComparator comparator =
       new GetListingComparator();
 
-  public static GetListingComparator getComparator() {
-    return comparator;
-  }
-
   @Override
   public DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) throws IOException {
@@ -1119,7 +1104,7 @@ public class RouterClientProtocol implements ClientProtocol {
     return mergeDtanodeStorageReport(dnSubcluster);
   }
 
-  protected DatanodeStorageReport[] mergeDtanodeStorageReport(
+  private DatanodeStorageReport[] mergeDtanodeStorageReport(
       Map<String, DatanodeStorageReport[]> dnSubcluster) {
     // Avoid repeating machines in multiple subclusters
     Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
@@ -1350,23 +1335,20 @@ public class RouterClientProtocol implements ClientProtocol {
   }
 
   /**
-   * Get all the locations of the path for {@link RouterClientProtocol#getContentSummary(String)}.
+   * Get all the locations of the path for {@link this#getContentSummary(String)}.
    * For example, there are some mount points:
-   * <p>
-   *   /a - [ns0 - /a]
-   *   /a/b - [ns0 - /a/b]
-   *   /a/b/c - [ns1 - /a/b/c]
-   * </p>
+   *   /a -> ns0 -> /a
+   *   /a/b -> ns0 -> /a/b
+   *   /a/b/c -> ns1 -> /a/b/c
    * When the path is '/a', the result of locations should be
    * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
    * When the path is '/b', will throw NoLocationException.
-   *
    * @param path the path to get content summary
    * @return one list contains all the remote location
-   * @throws IOException if an I/O error occurs
+   * @throws IOException
    */
   @VisibleForTesting
-  protected List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
+  List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
     // Try to get all the locations of the path.
     final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path);
     if (ns2Locations.isEmpty()) {
@@ -2057,7 +2039,7 @@ public class RouterClientProtocol implements ClientProtocol {
    *         replacement value.
    * @throws IOException If the dst paths could not be determined.
    */
-  protected RemoteParam getRenameDestinations(
+  private RemoteParam getRenameDestinations(
       final List<RemoteLocation> srcLocations,
       final List<RemoteLocation> dstLocations) throws IOException {
 
@@ -2105,7 +2087,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @param summaries Collection of individual summaries.
    * @return Aggregated content summary.
    */
-  protected ContentSummary aggregateContentSummary(
+  private ContentSummary aggregateContentSummary(
       Collection<ContentSummary> summaries) {
     if (summaries.size() == 1) {
       return summaries.iterator().next();
@@ -2160,7 +2142,7 @@ public class RouterClientProtocol implements ClientProtocol {
    *         everywhere.
    * @throws IOException If all the locations throw an exception.
    */
-  protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+  private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
       final RemoteMethod method) throws IOException {
     return getFileInfoAll(locations, method, -1);
   }
@@ -2175,7 +2157,7 @@ public class RouterClientProtocol implements ClientProtocol {
    *         everywhere.
    * @throws IOException If all the locations throw an exception.
    */
-  protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+  private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
       final RemoteMethod method, long timeOutMs) throws IOException {
 
     // Get the file info from everybody
@@ -2204,11 +2186,12 @@ public class RouterClientProtocol implements ClientProtocol {
 
   /**
    * Get the permissions for the parent of a child with given permissions.
-   * Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx.
+   * Add implicit u+wx permission for parent. This is based on
+   * @{FSDirMkdirOp#addImplicitUwx}.
    * @param mask The permission mask of the child.
    * @return The permission mask of the parent.
    */
-  protected static FsPermission getParentPermission(final FsPermission mask) {
+  private static FsPermission getParentPermission(final FsPermission mask) {
     FsPermission ret = new FsPermission(
         mask.getUserAction().or(FsAction.WRITE_EXECUTE),
         mask.getGroupAction(),
@@ -2225,7 +2208,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @return New HDFS file status representing a mount point.
    */
   @VisibleForTesting
-  protected HdfsFileStatus getMountPointStatus(
+  HdfsFileStatus getMountPointStatus(
       String name, int childrenNum, long date) {
     return getMountPointStatus(name, childrenNum, date, true);
   }
@@ -2240,7 +2223,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @return New HDFS file status representing a mount point.
    */
   @VisibleForTesting
-  protected HdfsFileStatus getMountPointStatus(
+  HdfsFileStatus getMountPointStatus(
       String name, int childrenNum, long date, boolean setPath) {
     long modTime = date;
     long accessTime = date;
@@ -2317,7 +2300,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @param path Name of the path to start checking dates from.
    * @return Map with the modification dates for all sub-entries.
    */
-  protected Map<String, Long> getMountPointDates(String path) {
+  private Map<String, Long> getMountPointDates(String path) {
     Map<String, Long> ret = new TreeMap<>();
     if (subclusterResolver instanceof MountTableResolver) {
       try {
@@ -2378,15 +2361,9 @@ public class RouterClientProtocol implements ClientProtocol {
   }
 
   /**
-   * Get a partial listing of the indicated directory.
-   *
-   * @param src the directory name
-   * @param startAfter the name to start after
-   * @param needLocation if blockLocations need to be returned
-   * @return a partial listing starting after startAfter
-   * @throws IOException if other I/O error occurred
+   * Get listing on remote locations.
    */
-  protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
+  private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
       String src, byte[] startAfter, boolean needLocation) throws IOException {
     try {
       List<RemoteLocation> locations =
@@ -2423,9 +2400,9 @@ public class RouterClientProtocol implements ClientProtocol {
    * @param startAfter starting listing from client, used to define listing
    *                   start boundary
    * @param remainingEntries how many entries left from subcluster
-   * @return true if should add mount point, otherwise false;
+   * @return
    */
-  protected static boolean shouldAddMountPoint(
+  private static boolean shouldAddMountPoint(
       byte[] mountPoint, byte[] lastEntry, byte[] startAfter,
       int remainingEntries) {
     if (comparator.compare(mountPoint, startAfter) > 0 &&
@@ -2448,7 +2425,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @throws IOException if unable to get the file status.
    */
   @VisibleForTesting
-  protected boolean isMultiDestDirectory(String src) throws IOException {
+  boolean isMultiDestDirectory(String src) throws IOException {
     try {
       if (rpcServer.isPathAll(src)) {
         List<RemoteLocation> locations;
@@ -2472,56 +2449,4 @@ public class RouterClientProtocol implements ClientProtocol {
   public int getRouterFederationRenameCount() {
     return rbfRename.getRouterFederationRenameCount();
   }
-
-  public RouterRpcServer getRpcServer() {
-    return rpcServer;
-  }
-
-  public RouterRpcClient getRpcClient() {
-    return rpcClient;
-  }
-
-  public FileSubclusterResolver getSubclusterResolver() {
-    return subclusterResolver;
-  }
-
-  public ActiveNamenodeResolver getNamenodeResolver() {
-    return namenodeResolver;
-  }
-
-  public long getServerDefaultsLastUpdate() {
-    return serverDefaultsLastUpdate;
-  }
-
-  public long getServerDefaultsValidityPeriod() {
-    return serverDefaultsValidityPeriod;
-  }
-
-  public boolean isAllowPartialList() {
-    return allowPartialList;
-  }
-
-  public long getMountStatusTimeOut() {
-    return mountStatusTimeOut;
-  }
-
-  public String getSuperUser() {
-    return superUser;
-  }
-
-  public String getSuperGroup() {
-    return superGroup;
-  }
-
-  public RouterStoragePolicy getStoragePolicy() {
-    return storagePolicy;
-  }
-
-  public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) {
-    this.serverDefaultsLastUpdate = serverDefaultsLastUpdate;
-  }
-
-  public RouterFederationRename getRbfRename() {
-    return rbfRename;
-  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java

@@ -93,7 +93,7 @@ public class RouterFederationRename {
    * @throws IOException if rename fails.
    * @return true if rename succeeds.
    */
-  public boolean routerFedRename(final String src, final String dst,
+  boolean routerFedRename(final String src, final String dst,
       final List<RemoteLocation> srcLocations,
       final List<RemoteLocation> dstLocations) throws IOException {
     if (!rpcServer.isEnableRenameAcrossNamespace()) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -1969,7 +1969,7 @@ public class RouterRpcClient {
    * @param nsId namespaceID
    * @return whether the 'namespace' has observer reads enabled.
    */
-  public boolean isNamespaceObserverReadEligible(String nsId) {
+  boolean isNamespaceObserverReadEligible(String nsId) {
     return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
   }
 

+ 5 - 15
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -75,11 +75,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
-import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota;
-import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol;
-import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol;
 import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
-import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncUserProtocol;
 import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
 import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
 import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
@@ -292,7 +288,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @param fileResolver File resolver to resolve file paths to subclusters.
    * @throws IOException If the RPC server could not be created.
    */
-  @SuppressWarnings("checkstyle:MethodLength")
   public RouterRpcServer(Configuration conf, Router router,
       ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver)
           throws IOException {
@@ -429,19 +424,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     if (this.enableAsync) {
       this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
           this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
-      this.clientProto = new RouterAsyncClientProtocol(conf, this);
-      this.nnProto = new RouterAsyncNamenodeProtocol(this);
-      this.routerProto = new RouterAsyncUserProtocol(this);
-      this.quotaCall = new AsyncQuota(this.router, this);
     } else {
       this.rpcClient = new RouterRpcClient(this.conf, this.router,
           this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
-      this.clientProto = new RouterClientProtocol(conf, this);
-      this.nnProto = new RouterNamenodeProtocol(this);
-      this.routerProto = new RouterUserProtocol(this);
-      this.quotaCall = new Quota(this.router, this);
     }
-
+    this.nnProto = new RouterNamenodeProtocol(this);
+    this.quotaCall = new Quota(this.router, this);
+    this.clientProto = new RouterClientProtocol(conf, this);
+    this.routerProto = new RouterUserProtocol(this);
     long dnCacheExpire = conf.getTimeDuration(
         DN_REPORT_CACHE_EXPIRE,
         DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
@@ -2203,7 +2193,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @param path Path to check.
    * @return If a path should be in all subclusters.
    */
-  public boolean isPathAll(final String path) {
+  boolean isPathAll(final String path) {
     MountTable entry = getMountTable(path);
     return entry != null && entry.isAll();
   }

+ 0 - 1083
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java

@@ -1,1083 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router.async;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
-import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
-import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
-import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
-import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
-import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
-import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
-import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
-import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
-import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
-import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
-import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
-import org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename;
-import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
-import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
-import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
-import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
-import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
-import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
-import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture;
-
-/**
- * Module that implements all the async RPC calls in {@link ClientProtocol} in the
- * {@link RouterRpcServer}.
- */
-public class RouterAsyncClientProtocol extends RouterClientProtocol {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RouterAsyncClientProtocol.class.getName());
-
-  private final RouterRpcServer rpcServer;
-  private final RouterRpcClient rpcClient;
-  private final RouterFederationRename rbfRename;
-  private final FileSubclusterResolver subclusterResolver;
-  private final ActiveNamenodeResolver namenodeResolver;
-  /** If it requires response from all subclusters. */
-  private final boolean allowPartialList;
-  /** Time out when getting the mount statistics. */
-  private long mountStatusTimeOut;
-  /** Identifier for the super user. */
-  private String superUser;
-  /** Identifier for the super group. */
-  private final String superGroup;
-  /**
-   * Caching server defaults so as to prevent redundant calls to namenode,
-   * similar to DFSClient, caching saves efforts when router connects
-   * to multiple clients.
-   */
-  private volatile FsServerDefaults serverDefaults;
-
-  public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
-    super(conf, rpcServer);
-    this.rpcServer = rpcServer;
-    this.rpcClient = rpcServer.getRPCClient();
-    this.rbfRename = getRbfRename();
-    this.subclusterResolver = getSubclusterResolver();
-    this.namenodeResolver = getNamenodeResolver();
-    this.allowPartialList = isAllowPartialList();
-    this.mountStatusTimeOut = getMountStatusTimeOut();
-    this.superUser = getSuperUser();
-    this.superGroup = getSuperGroup();
-  }
-
-  @Override
-  public FsServerDefaults getServerDefaults() throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-    long now = Time.monotonicNow();
-    if ((serverDefaults == null) || (now - getServerDefaultsLastUpdate()
-        > getServerDefaultsValidityPeriod())) {
-      RemoteMethod method = new RemoteMethod("getServerDefaults");
-      rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class);
-      asyncApply(o -> {
-        serverDefaults = (FsServerDefaults) o;
-        setServerDefaultsLastUpdate(now);
-        return serverDefaults;
-      });
-    } else {
-      asyncComplete(serverDefaults);
-    }
-    return asyncReturn(FsServerDefaults.class);
-  }
-
-  @Override
-  public HdfsFileStatus create(String src, FsPermission masked,
-      String clientName, EnumSetWritable<CreateFlag> flag,
-      boolean createParent, short replication, long blockSize,
-      CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
-      String storagePolicy) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    if (createParent && rpcServer.isPathAll(src)) {
-      int index = src.lastIndexOf(Path.SEPARATOR);
-      String parent = src.substring(0, index);
-      LOG.debug("Creating {} requires creating parent {}", src, parent);
-      FsPermission parentPermissions = getParentPermission(masked);
-      mkdirs(parent, parentPermissions, createParent);
-      asyncApply((ApplyFunction<Boolean, Boolean>) success -> {
-        if (!success) {
-          // This shouldn't happen as mkdirs returns true or exception
-          LOG.error("Couldn't create parents for {}", src);
-        }
-        return success;
-      });
-    }
-
-    RemoteMethod method = new RemoteMethod("create",
-        new Class<?>[] {String.class, FsPermission.class, String.class,
-            EnumSetWritable.class, boolean.class, short.class,
-            long.class, CryptoProtocolVersion[].class,
-            String.class, String.class},
-        new RemoteParam(), masked, clientName, flag, createParent,
-        replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
-    final List<RemoteLocation> locations =
-        rpcServer.getLocationsForPath(src, true);
-    final RemoteLocation[] createLocation = new RemoteLocation[1];
-    asyncTry(() -> {
-      rpcServer.getCreateLocationAsync(src, locations);
-      asyncApply((AsyncApplyFunction<RemoteLocation, Object>) remoteLocation -> {
-        createLocation[0] = remoteLocation;
-        rpcClient.invokeSingle(remoteLocation, method, HdfsFileStatus.class);
-        asyncApply((ApplyFunction<HdfsFileStatus, Object>) status -> {
-          status.setNamespace(remoteLocation.getNameserviceId());
-          return status;
-        });
-      });
-    });
-    asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
-      final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
-          method, src, ioe, createLocation[0], locations);
-      rpcClient.invokeSequential(
-          newLocations, method, HdfsFileStatus.class, null);
-    }, IOException.class);
-
-    return asyncReturn(HdfsFileStatus.class);
-  }
-
-  @Override
-  public LastBlockWithStatus append(
-      String src, String clientName,
-      EnumSetWritable<CreateFlag> flag) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("append",
-        new Class<?>[] {String.class, String.class, EnumSetWritable.class},
-        new RemoteParam(), clientName, flag);
-    rpcClient.invokeSequential(method, locations, LastBlockWithStatus.class, null);
-    asyncApply((ApplyFunction<RemoteResult, LastBlockWithStatus>) result -> {
-      LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult();
-      lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId());
-      return lbws;
-    });
-    return asyncReturn(LastBlockWithStatus.class);
-  }
-
-  @Deprecated
-  @Override
-  public boolean rename(final String src, final String dst)
-      throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    final List<RemoteLocation> srcLocations =
-        rpcServer.getLocationsForPath(src, true, false);
-    final List<RemoteLocation> dstLocations =
-        rpcServer.getLocationsForPath(dst, false, false);
-    // srcLocations may be trimmed by getRenameDestinations()
-    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
-    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
-    if (locs.isEmpty()) {
-      asyncComplete(
-          rbfRename.routerFedRename(src, dst, srcLocations, dstLocations));
-      return asyncReturn(Boolean.class);
-    }
-    RemoteMethod method = new RemoteMethod("rename",
-        new Class<?>[] {String.class, String.class},
-        new RemoteParam(), dstParam);
-    isMultiDestDirectory(src);
-    asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> {
-      if (isMultiDestDirectory) {
-        if (locs.size() != srcLocations.size()) {
-          throw new IOException("Rename of " + src + " to " + dst + " is not"
-              + " allowed. The number of remote locations for both source and"
-              + " target should be same.");
-        }
-        rpcClient.invokeAll(locs, method);
-      } else {
-        rpcClient.invokeSequential(locs, method, Boolean.class,
-            Boolean.TRUE);
-      }
-    });
-    return asyncReturn(Boolean.class);
-  }
-
-  @Override
-  public void rename2(
-      final String src, final String dst,
-      final Options.Rename... options) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    final List<RemoteLocation> srcLocations =
-        rpcServer.getLocationsForPath(src, true, false);
-    final List<RemoteLocation> dstLocations =
-        rpcServer.getLocationsForPath(dst, false, false);
-    // srcLocations may be trimmed by getRenameDestinations()
-    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
-    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
-    if (locs.isEmpty()) {
-      rbfRename.routerFedRename(src, dst, srcLocations, dstLocations);
-      return;
-    }
-    RemoteMethod method = new RemoteMethod("rename2",
-        new Class<?>[] {String.class, String.class, options.getClass()},
-        new RemoteParam(), dstParam, options);
-    isMultiDestDirectory(src);
-    asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> {
-      if (isMultiDestDirectory) {
-        if (locs.size() != srcLocations.size()) {
-          throw new IOException("Rename of " + src + " to " + dst + " is not"
-              + " allowed. The number of remote locations for both source and"
-              + " target should be same.");
-        }
-        rpcClient.invokeConcurrent(locs, method);
-      } else {
-        rpcClient.invokeSequential(locs, method, null, null);
-      }
-    });
-  }
-
-  @Override
-  public void concat(String trg, String[] src) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    // Concat only effects when all files in the same namespace.
-    getFileRemoteLocation(trg);
-    asyncApply((AsyncApplyFunction<RemoteLocation, Object>) targetDestination -> {
-      if (targetDestination == null) {
-        throw new IOException("Cannot find target file - " + trg);
-      }
-      String targetNameService = targetDestination.getNameserviceId();
-      String[] sourceDestinations = new String[src.length];
-      int[] index = new int[1];
-      asyncForEach(Arrays.stream(src).iterator(), (forEachRun, sourceFile) -> {
-        getFileRemoteLocation(sourceFile);
-        asyncApply((ApplyFunction<RemoteLocation, Object>) srcLocation -> {
-          if (srcLocation == null) {
-            throw new IOException("Cannot find source file - " + sourceFile);
-          }
-          sourceDestinations[index[0]++] = srcLocation.getDest();
-          if (!targetNameService.equals(srcLocation.getNameserviceId())) {
-            throw new IOException("Cannot concatenate source file " + sourceFile
-                + " because it is located in a different namespace" + " with nameservice "
-                + srcLocation.getNameserviceId() + " from the target file with nameservice "
-                + targetNameService);
-          }
-          return null;
-        });
-      });
-      asyncApply((AsyncApplyFunction<Object, Object>) o -> {
-        // Invoke
-        RemoteMethod method = new RemoteMethod("concat",
-            new Class<?>[] {String.class, String[].class},
-            targetDestination.getDest(), sourceDestinations);
-        rpcClient.invokeSingle(targetDestination, method, Void.class);
-      });
-    });
-  }
-
-  @Override
-  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
-      throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations =
-        rpcServer.getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("mkdirs",
-        new Class<?>[] {String.class, FsPermission.class, boolean.class},
-        new RemoteParam(), masked, createParent);
-
-    // Create in all locations
-    if (rpcServer.isPathAll(src)) {
-      return rpcClient.invokeAll(locations, method);
-    }
-
-    asyncComplete(false);
-    if (locations.size() > 1) {
-      // Check if this directory already exists
-      asyncTry(() -> {
-        getFileInfo(src);
-        asyncApply((ApplyFunction<HdfsFileStatus, Boolean>) fileStatus -> {
-          if (fileStatus != null) {
-            // When existing, the NN doesn't return an exception; return true
-            return true;
-          }
-          return false;
-        });
-      });
-      asyncCatch((ret, ex) -> {
-        // Can't query if this file exists or not.
-        LOG.error("Error getting file info for {} while proxying mkdirs: {}",
-            src, ex.getMessage());
-        return false;
-      }, IOException.class);
-    }
-
-    final RemoteLocation firstLocation = locations.get(0);
-    asyncApply((AsyncApplyFunction<Boolean, Boolean>) success -> {
-      if (success) {
-        asyncComplete(true);
-        return;
-      }
-      asyncTry(() -> {
-        rpcClient.invokeSingle(firstLocation, method, Boolean.class);
-      });
-
-      asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
-        final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
-            method, src, ioe, firstLocation, locations);
-        rpcClient.invokeSequential(
-            newLocations, method, Boolean.class, Boolean.TRUE);
-      }, IOException.class);
-    });
-
-    return asyncReturn(Boolean.class);
-  }
-
-  @Override
-  public DirectoryListing getListing(
-      String src, byte[] startAfter, boolean needLocation) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-    GetListingComparator comparator = RouterClientProtocol.getComparator();
-    getListingInt(src, startAfter, needLocation);
-    asyncApply((AsyncApplyFunction<List<RemoteResult<RemoteLocation, DirectoryListing>>, Object>)
-        listings -> {
-        TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator);
-        int totalRemainingEntries = 0;
-        final int[] remainingEntries = {0};
-        boolean namenodeListingExists = false;
-        // Check the subcluster listing with the smallest name to make sure
-        // no file is skipped across subclusters
-        byte[] lastName = null;
-        if (listings != null) {
-          for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
-            if (result.hasException()) {
-              IOException ioe = result.getException();
-              if (ioe instanceof FileNotFoundException) {
-                RemoteLocation location = result.getLocation();
-                LOG.debug("Cannot get listing from {}", location);
-              } else if (!allowPartialList) {
-                throw ioe;
-              }
-            } else if (result.getResult() != null) {
-              DirectoryListing listing = result.getResult();
-              totalRemainingEntries += listing.getRemainingEntries();
-              HdfsFileStatus[] partialListing = listing.getPartialListing();
-              int length = partialListing.length;
-              if (length > 0) {
-                HdfsFileStatus lastLocalEntry = partialListing[length-1];
-                byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes();
-                if (lastName == null ||
-                    comparator.compare(lastName, lastLocalName) > 0) {
-                  lastName = lastLocalName;
-                }
-              }
-            }
-          }
-
-          // Add existing entries
-          for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
-            DirectoryListing listing = result.getResult();
-            if (listing != null) {
-              namenodeListingExists = true;
-              for (HdfsFileStatus file : listing.getPartialListing()) {
-                byte[] filename = file.getLocalNameInBytes();
-                if (totalRemainingEntries > 0 &&
-                    comparator.compare(filename, lastName) > 0) {
-                  // Discarding entries further than the lastName
-                  remainingEntries[0]++;
-                } else {
-                  nnListing.put(filename, file);
-                }
-              }
-              remainingEntries[0] += listing.getRemainingEntries();
-            }
-          }
-        }
-
-        // Add mount points at this level in the tree
-        final List<String> children = subclusterResolver.getMountPoints(src);
-        if (children != null) {
-          // Get the dates for each mount point
-          Map<String, Long> dates = getMountPointDates(src);
-          byte[] finalLastName = lastName;
-          asyncForEach(children.iterator(), (forEachRun, child) -> {
-            long date = 0;
-            if (dates != null && dates.containsKey(child)) {
-              date = dates.get(child);
-            }
-            Path childPath = new Path(src, child);
-            getMountPointStatus(childPath.toString(), 0, date);
-            asyncApply((ApplyFunction<HdfsFileStatus, Object>) dirStatus -> {
-              // if there is no subcluster path, always add mount point
-              byte[] bChild = DFSUtil.string2Bytes(child);
-              if (finalLastName == null) {
-                nnListing.put(bChild, dirStatus);
-              } else {
-                if (shouldAddMountPoint(bChild,
-                    finalLastName, startAfter, remainingEntries[0])) {
-                  // This may overwrite existing listing entries with the mount point
-                  // TODO don't add if already there?
-                  nnListing.put(bChild, dirStatus);
-                }
-              }
-              return null;
-            });
-          });
-          asyncApply(o -> {
-            // Update the remaining count to include left mount points
-            if (nnListing.size() > 0) {
-              byte[] lastListing = nnListing.lastKey();
-              for (int i = 0; i < children.size(); i++) {
-                byte[] bChild = DFSUtil.string2Bytes(children.get(i));
-                if (comparator.compare(bChild, lastListing) > 0) {
-                  remainingEntries[0] += (children.size() - i);
-                  break;
-                }
-              }
-            }
-            return null;
-          });
-        }
-        asyncComplete(namenodeListingExists);
-        asyncApply((ApplyFunction<Boolean, Object>) exists -> {
-          if (!exists && nnListing.size() == 0 && children == null) {
-            // NN returns a null object if the directory cannot be found and has no
-            // listing. If we didn't retrieve any NN listing data, and there are no
-            // mount points here, return null.
-            return null;
-          }
-
-          // Generate combined listing
-          HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()];
-          combinedData = nnListing.values().toArray(combinedData);
-          return new DirectoryListing(combinedData, remainingEntries[0]);
-        });
-      });
-    return asyncReturn(DirectoryListing.class);
-  }
-
-  /**
-   * Get listing on remote locations.
-   */
-  @Override
-  protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
-      String src, byte[] startAfter, boolean needLocation) throws IOException {
-    List<RemoteLocation> locations =
-        rpcServer.getLocationsForPath(src, false, false);
-    // Locate the dir and fetch the listing.
-    if (locations.isEmpty()) {
-      asyncComplete(new ArrayList<>());
-      return asyncReturn(List.class);
-    }
-    asyncTry(() -> {
-      RemoteMethod method = new RemoteMethod("getListing",
-          new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
-          new RemoteParam(), startAfter, needLocation);
-      rpcClient.invokeConcurrent(locations, method, false, -1,
-          DirectoryListing.class);
-    });
-    asyncCatch((CatchFunction<List, RouterResolveException>) (o, e) -> {
-      LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage());
-      LOG.info("Cannot get locations for {}, {}.", src, e.getMessage());
-      return new ArrayList<>();
-    }, RouterResolveException.class);
-    return asyncReturn(List.class);
-  }
-
-
-  @Override
-  public HdfsFileStatus getFileInfo(String src) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-
-    final IOException[] noLocationException = new IOException[1];
-    asyncTry(() -> {
-      final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false, false);
-      RemoteMethod method = new RemoteMethod("getFileInfo",
-          new Class<?>[] {String.class}, new RemoteParam());
-      // If it's a directory, we check in all locations
-      if (rpcServer.isPathAll(src)) {
-        getFileInfoAll(locations, method);
-      } else {
-        // Check for file information sequentially
-        rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, null);
-      }
-    });
-    asyncCatch((o, e) -> {
-      if (e instanceof NoLocationException
-          || e instanceof RouterResolveException) {
-        noLocationException[0] = e;
-      }
-      throw e;
-    }, IOException.class);
-
-    asyncApply((AsyncApplyFunction<HdfsFileStatus, Object>) ret -> {
-      // If there is no real path, check mount points
-      if (ret == null) {
-        List<String> children = subclusterResolver.getMountPoints(src);
-        if (children != null && !children.isEmpty()) {
-          Map<String, Long> dates = getMountPointDates(src);
-          long date = 0;
-          if (dates != null && dates.containsKey(src)) {
-            date = dates.get(src);
-          }
-          getMountPointStatus(src, children.size(), date, false);
-        } else if (children != null) {
-          // The src is a mount point, but there are no files or directories
-          getMountPointStatus(src, 0, 0, false);
-        } else {
-          asyncComplete(null);
-        }
-        asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) result -> {
-          // Can't find mount point for path and the path didn't contain any sub monit points,
-          // throw the NoLocationException to client.
-          if (result == null && noLocationException[0] != null) {
-            throw noLocationException[0];
-          }
-
-          return result;
-        });
-      } else {
-        asyncComplete(ret);
-      }
-    });
-
-    return asyncReturn(HdfsFileStatus.class);
-  }
-
-  @Override
-  public RemoteLocation getFileRemoteLocation(String path) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-
-    final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false);
-    if (locations.size() == 1) {
-      asyncComplete(locations.get(0));
-      return asyncReturn(RemoteLocation.class);
-    }
-
-    asyncForEach(locations.iterator(), (forEachRun, location) -> {
-      RemoteMethod method =
-          new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new RemoteParam());
-      rpcClient.invokeSequential(Collections.singletonList(location), method,
-          HdfsFileStatus.class, null);
-      asyncApply((ApplyFunction<HdfsFileStatus, RemoteLocation>) ret -> {
-        if (ret != null) {
-          forEachRun.breakNow();
-          return location;
-        }
-        return null;
-      });
-    });
-
-    return asyncReturn(RemoteLocation.class);
-  }
-
-  @Override
-  public HdfsFileStatus getMountPointStatus(
-      String name, int childrenNum, long date, boolean setPath) {
-    long modTime = date;
-    long accessTime = date;
-    final FsPermission[] permission = new FsPermission[]{FsPermission.getDirDefault()};
-    final String[] owner = new String[]{this.superUser};
-    final String[] group = new String[]{this.superGroup};
-    final int[] childrenNums = new int[]{childrenNum};
-    final EnumSet<HdfsFileStatus.Flags>[] flags =
-        new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)};
-    asyncComplete(null);
-    if (getSubclusterResolver() instanceof MountTableResolver) {
-      asyncTry(() -> {
-        String mName = name.startsWith("/") ? name : "/" + name;
-        MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
-        MountTable entry = mountTable.getMountPoint(mName);
-        if (entry != null) {
-          permission[0] = entry.getMode();
-          owner[0] = entry.getOwnerName();
-          group[0] = entry.getGroupName();
-
-          RemoteMethod method = new RemoteMethod("getFileInfo",
-              new Class<?>[] {String.class}, new RemoteParam());
-          getFileInfoAll(
-              entry.getDestinations(), method, mountStatusTimeOut);
-          asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) fInfo -> {
-            if (fInfo != null) {
-              permission[0] = fInfo.getPermission();
-              owner[0] = fInfo.getOwner();
-              group[0] = fInfo.getGroup();
-              childrenNums[0] = fInfo.getChildrenNum();
-              flags[0] = DFSUtil
-                  .getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(),
-                      fInfo.isSnapshotEnabled(), fInfo.hasAcl());
-            }
-            return fInfo;
-          });
-        }
-      });
-      asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> {
-        LOG.error("Cannot get mount point: {}", e.getMessage());
-        return status;
-      }, IOException.class);
-    } else {
-      try {
-        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
-        owner[0] = ugi.getUserName();
-        group[0] = ugi.getPrimaryGroupName();
-      } catch (IOException e) {
-        String msg = "Cannot get remote user: " + e.getMessage();
-        if (UserGroupInformation.isSecurityEnabled()) {
-          LOG.error(msg);
-        } else {
-          LOG.debug(msg);
-        }
-      }
-    }
-    long inodeId = 0;
-    HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder();
-    asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) status -> {
-      if (setPath) {
-        Path path = new Path(name);
-        String nameStr = path.getName();
-        builder.path(DFSUtil.string2Bytes(nameStr));
-      }
-
-      return builder.isdir(true)
-          .mtime(modTime)
-          .atime(accessTime)
-          .perm(permission[0])
-          .owner(owner[0])
-          .group(group[0])
-          .symlink(new byte[0])
-          .fileId(inodeId)
-          .children(childrenNums[0])
-          .flags(flags[0])
-          .build();
-    });
-    return asyncReturn(HdfsFileStatus.class);
-  }
-
-  @Override
-  protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
-      final RemoteMethod method, long timeOutMs) throws IOException {
-
-    asyncComplete(null);
-    // Get the file info from everybody
-    rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
-        HdfsFileStatus.class);
-    asyncApply(res -> {
-      Map<RemoteLocation, HdfsFileStatus> results = (Map<RemoteLocation, HdfsFileStatus>) res;
-      int children = 0;
-      // We return the first file
-      HdfsFileStatus dirStatus = null;
-      for (RemoteLocation loc : locations) {
-        HdfsFileStatus fileStatus = results.get(loc);
-        if (fileStatus != null) {
-          children += fileStatus.getChildrenNum();
-          if (!fileStatus.isDirectory()) {
-            return fileStatus;
-          } else if (dirStatus == null) {
-            dirStatus = fileStatus;
-          }
-        }
-      }
-      if (dirStatus != null) {
-        return updateMountPointStatus(dirStatus, children);
-      }
-      return null;
-    });
-    return asyncReturn(HdfsFileStatus.class);
-  }
-
-  @Override
-  public boolean recoverLease(String src, String clientName) throws IOException {
-    super.recoverLease(src, clientName);
-    return asyncReturn(boolean.class);
-  }
-
-  @Override
-  public long[] getStats() throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("getStats");
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
-    asyncApply(o -> {
-      Map<FederationNamespaceInfo, long[]> results
-          = (Map<FederationNamespaceInfo, long[]>) o;
-      long[] combinedData = new long[STATS_ARRAY_LENGTH];
-      for (long[] data : results.values()) {
-        for (int i = 0; i < combinedData.length && i < data.length; i++) {
-          if (data[i] >= 0) {
-            combinedData[i] += data[i];
-          }
-        }
-      }
-      return combinedData;
-    });
-    return asyncReturn(long[].class);
-  }
-
-  @Override
-  public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("getReplicatedBlockStats");
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true,
-        false, ReplicatedBlockStats.class);
-    asyncApply(o -> {
-      Map<FederationNamespaceInfo, ReplicatedBlockStats> ret =
-          (Map<FederationNamespaceInfo, ReplicatedBlockStats>) o;
-      return ReplicatedBlockStats.merge(ret.values());
-    });
-    return asyncReturn(ReplicatedBlockStats.class);
-  }
-
-  @Override
-  public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type)
-      throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
-    return rpcServer.getDatanodeReportAsync(type, true, 0);
-  }
-
-  @Override
-  public DatanodeStorageReport[] getDatanodeStorageReport(
-      HdfsConstants.DatanodeReportType type) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
-
-    rpcServer.getDatanodeStorageReportMapAsync(type);
-    asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>)
-        dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster));
-    return asyncReturn(DatanodeStorageReport[].class);
-  }
-
-  public DatanodeStorageReport[] getDatanodeStorageReport(
-      HdfsConstants.DatanodeReportType type, boolean requireResponse, long timeOutMs)
-      throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
-
-    rpcServer.getDatanodeStorageReportMapAsync(type, requireResponse, timeOutMs);
-    asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>, DatanodeStorageReport[]>)
-        dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster));
-    return asyncReturn(DatanodeStorageReport[].class);
-  }
-
-  @Override
-  public boolean setSafeMode(HdfsConstants.SafeModeAction action,
-                             boolean isChecked) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    // Set safe mode in all the name spaces
-    RemoteMethod method = new RemoteMethod("setSafeMode",
-        new Class<?>[] {HdfsConstants.SafeModeAction.class, boolean.class},
-        action, isChecked);
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(
-        nss, method, true, !isChecked, Boolean.class);
-
-    asyncApply(o -> {
-      Map<FederationNamespaceInfo, Boolean> results
-          = (Map<FederationNamespaceInfo, Boolean>) o;
-      // We only report true if all the name space are in safe mode
-      int numSafemode = 0;
-      for (boolean safemode : results.values()) {
-        if (safemode) {
-          numSafemode++;
-        }
-      }
-      return numSafemode == results.size();
-    });
-    return asyncReturn(Boolean.class);
-  }
-
-  @Override
-  public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("saveNamespace",
-        new Class<?>[] {long.class, long.class}, timeWindow, txGap);
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true,
-        false, boolean.class);
-
-    asyncApply(o -> {
-      Map<FederationNamespaceInfo, Boolean> ret =
-          (Map<FederationNamespaceInfo, Boolean>) o;
-      boolean success = true;
-      for (boolean s : ret.values()) {
-        if (!s) {
-          success = false;
-          break;
-        }
-      }
-      return success;
-    });
-    return asyncReturn(Boolean.class);
-  }
-
-  @Override
-  public long rollEdits() throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false, long.class);
-    asyncApply(o -> {
-      Map<FederationNamespaceInfo, Long> ret =
-          (Map<FederationNamespaceInfo, Long>) o;
-      // Return the maximum txid
-      long txid = 0;
-      for (long t : ret.values()) {
-        if (t > txid) {
-          txid = t;
-        }
-      }
-      return txid;
-    });
-    return asyncReturn(long.class);
-  }
-
-  @Override
-  public boolean restoreFailedStorage(String arg) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("restoreFailedStorage",
-        new Class<?>[] {String.class}, arg);
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
-    asyncApply(o -> {
-      Map<FederationNamespaceInfo, Boolean> ret =
-          (Map<FederationNamespaceInfo, Boolean>) o;
-      boolean success = true;
-      for (boolean s : ret.values()) {
-        if (!s) {
-          success = false;
-          break;
-        }
-      }
-      return success;
-    });
-    return asyncReturn(boolean.class);
-  }
-
-  @Override
-  public RollingUpgradeInfo rollingUpgrade(HdfsConstants.RollingUpgradeAction action)
-      throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("rollingUpgrade",
-        new Class<?>[] {HdfsConstants.RollingUpgradeAction.class}, action);
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-
-    rpcClient.invokeConcurrent(
-        nss, method, true, false, RollingUpgradeInfo.class);
-    asyncApply(o -> {
-      Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
-          (Map<FederationNamespaceInfo, RollingUpgradeInfo>) o;
-      // Return the first rolling upgrade info
-      RollingUpgradeInfo info = null;
-      for (RollingUpgradeInfo infoNs : ret.values()) {
-        if (info == null && infoNs != null) {
-          info = infoNs;
-        }
-      }
-      return info;
-    });
-    return asyncReturn(RollingUpgradeInfo.class);
-  }
-
-  @Override
-  public ContentSummary getContentSummary(String path) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-
-    // Get the summaries from regular files
-    final Collection<ContentSummary> summaries = new ArrayList<>();
-    final List<RemoteLocation> locations = getLocationsForContentSummary(path);
-    final RemoteMethod method = new RemoteMethod("getContentSummary",
-        new Class<?>[] {String.class}, new RemoteParam());
-    rpcClient.invokeConcurrent(locations, method,
-        false, -1, ContentSummary.class);
-
-    asyncApply(o -> {
-      final List<RemoteResult<RemoteLocation, ContentSummary>> results =
-          (List<RemoteResult<RemoteLocation, ContentSummary>>) o;
-
-      FileNotFoundException notFoundException = null;
-      for (RemoteResult<RemoteLocation, ContentSummary> result : results) {
-        if (result.hasException()) {
-          IOException ioe = result.getException();
-          if (ioe instanceof FileNotFoundException) {
-            notFoundException = (FileNotFoundException)ioe;
-          } else if (!allowPartialList) {
-            throw ioe;
-          }
-        } else if (result.getResult() != null) {
-          summaries.add(result.getResult());
-        }
-      }
-
-      // Throw original exception if no original nor mount points
-      if (summaries.isEmpty() && notFoundException != null) {
-        throw notFoundException;
-      }
-      return aggregateContentSummary(summaries);
-    });
-
-    return asyncReturn(ContentSummary.class);
-  }
-
-  @Override
-  public long getCurrentEditLogTxid() throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod(
-        "getCurrentEditLogTxid", new Class<?>[] {});
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false, long.class);
-
-    asyncApply(o -> {
-      Map<FederationNamespaceInfo, Long> ret =
-          (Map<FederationNamespaceInfo, Long>) o;
-      // Return the maximum txid
-      long txid = 0;
-      for (long t : ret.values()) {
-        if (t > txid) {
-          txid = t;
-        }
-      }
-      return txid;
-    });
-    return asyncReturn(long.class);
-  }
-
-  @Override
-  public void msync() throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
-    // Only msync to nameservices with observer reads enabled.
-    Set<FederationNamespaceInfo> allNamespaces = namenodeResolver.getNamespaces();
-    RemoteMethod method = new RemoteMethod("msync");
-    Set<FederationNamespaceInfo> namespacesEligibleForObserverReads = allNamespaces
-        .stream()
-        .filter(ns -> rpcClient.isNamespaceObserverReadEligible(ns.getNameserviceId()))
-        .collect(Collectors.toSet());
-    if (namespacesEligibleForObserverReads.isEmpty()) {
-      asyncCompleteWith(CompletableFuture.completedFuture(null));
-      return;
-    }
-    rpcClient.invokeConcurrent(namespacesEligibleForObserverReads, method);
-  }
-
-  @Override
-  public boolean setReplication(String src, short replication)
-      throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("setReplication",
-        new Class<?>[] {String.class, short.class}, new RemoteParam(),
-        replication);
-    if (rpcServer.isInvokeConcurrent(src)) {
-      rpcClient.invokeConcurrent(locations, method, Boolean.class);
-      asyncApply(o -> {
-        Map<RemoteLocation, Boolean> results = (Map<RemoteLocation, Boolean>) o;
-        return !results.containsValue(false);
-      });
-    } else {
-      rpcClient.invokeSequential(locations, method, Boolean.class,
-          Boolean.TRUE);
-    }
-    return asyncReturn(boolean.class);
-  }
-
-  /**
-   * Checks if the path is a directory and is supposed to be present in all
-   * subclusters.
-   * @param src the source path
-   * @return true if the path is directory and is supposed to be present in all
-   *         subclusters else false in all other scenarios.
-   * @throws IOException if unable to get the file status.
-   */
-  @Override
-  public boolean isMultiDestDirectory(String src) throws IOException {
-    try {
-      if (rpcServer.isPathAll(src)) {
-        List<RemoteLocation> locations;
-        locations = rpcServer.getLocationsForPath(src, false, false);
-        RemoteMethod method = new RemoteMethod("getFileInfo",
-            new Class<?>[] {String.class}, new RemoteParam());
-        rpcClient.invokeSequential(locations,
-            method, HdfsFileStatus.class, null);
-        CompletableFuture<Object> completableFuture = getCompletableFuture();
-        completableFuture = completableFuture.thenApply(o -> {
-          HdfsFileStatus fileStatus = (HdfsFileStatus) o;
-          if (fileStatus != null) {
-            return fileStatus.isDirectory();
-          } else {
-            LOG.debug("The destination {} doesn't exist.", src);
-          }
-          return false;
-        });
-        asyncCompleteWith(completableFuture);
-        return asyncReturn(Boolean.class);
-      }
-    } catch (UnresolvedPathException e) {
-      LOG.debug("The destination {} is a symlink.", src);
-    }
-    asyncCompleteWith(CompletableFuture.completedFuture(false));
-    return asyncReturn(Boolean.class);
-  }
-}

+ 2 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java

@@ -56,7 +56,6 @@ public class RouterAsyncProtocolTestBase {
   private FileSystem routerFs;
   private RouterRpcServer routerRpcServer;
   private RouterRpcServer routerAsyncRpcServer;
-  protected static final String TEST_DIR_PATH = "/testdir";
 
   @BeforeClass
   public static void setUpCluster() throws Exception {
@@ -115,20 +114,19 @@ public class RouterAsyncProtocolTestBase {
         routerRpcServer.getRouterStateIdContext());
     routerAsyncRpcServer = Mockito.spy(routerRpcServer);
     Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
-    Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true);
 
     // Create mock locations
     MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
     resolver.addLocation("/", ns0, "/");
     FsPermission permission = new FsPermission("705");
-    routerFs.mkdirs(new Path(TEST_DIR_PATH), permission);
+    routerFs.mkdirs(new Path("/testdir"), permission);
   }
 
   @After
   public void tearDown() throws IOException {
     // clear client context
     CallerContext.setCurrent(null);
-    boolean delete = routerFs.delete(new Path(TEST_DIR_PATH));
+    boolean delete = routerFs.delete(new Path("/testdir"));
     assertTrue(delete);
     if (routerFs != null) {
       routerFs.close();

+ 0 - 144
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java

@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router.async;
-
-import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
-import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.util.Lists;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.List;
-
-import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES;
-import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
-import static org.apache.hadoop.fs.permission.AclEntryType.USER;
-import static org.apache.hadoop.fs.permission.FsAction.ALL;
-import static org.apache.hadoop.fs.permission.FsAction.NONE;
-import static org.apache.hadoop.fs.permission.FsAction.READ;
-import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE;
-import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
-import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Used to test the functionality of {@link RouterAsyncClientProtocol}.
- */
-public class TestRouterAsyncClientProtocol extends RouterAsyncProtocolTestBase {
-  private RouterAsyncClientProtocol asyncClientProtocol;
-  private RouterClientProtocol clientProtocol;
-  private final String testPath = TEST_DIR_PATH + "/test";
-
-  @Before
-  public void setup() throws IOException {
-    asyncClientProtocol = new RouterAsyncClientProtocol(getRouterConf(), getRouterAsyncRpcServer());
-    clientProtocol = new RouterClientProtocol(getRouterConf(), getRouterRpcServer());
-  }
-
-  @Test
-  public void testGetServerDefaults() throws Exception {
-    FsServerDefaults serverDefaults = clientProtocol.getServerDefaults();
-    asyncClientProtocol.getServerDefaults();
-    FsServerDefaults fsServerDefaults = syncReturn(FsServerDefaults.class);
-    assertEquals(serverDefaults.getBlockSize(), fsServerDefaults.getBlockSize());
-    assertEquals(serverDefaults.getReplication(), fsServerDefaults.getReplication());
-    assertEquals(serverDefaults.getChecksumType(), fsServerDefaults.getChecksumType());
-    assertEquals(
-        serverDefaults.getDefaultStoragePolicyId(), fsServerDefaults.getDefaultStoragePolicyId());
-  }
-
-  @Test
-  public void testClientProtocolRpc() throws Exception {
-    asyncClientProtocol.mkdirs(testPath, new FsPermission(ALL, ALL, ALL), false);
-    Boolean success = syncReturn(Boolean.class);
-    assertTrue(success);
-
-    asyncClientProtocol.setPermission(testPath, new FsPermission(READ_WRITE, READ, NONE));
-    syncReturn(Void.class);
-
-    asyncClientProtocol.getFileInfo(testPath);
-    HdfsFileStatus hdfsFileStatus = syncReturn(HdfsFileStatus.class);
-    assertEquals(hdfsFileStatus.getPermission(), new FsPermission(READ_WRITE, READ, NONE));
-
-    List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "tmpUser", ALL));
-    asyncClientProtocol.setAcl(testPath, aclSpec);
-    syncReturn(Void.class);
-    asyncClientProtocol.setOwner(testPath, "tmpUser", "tmpUserGroup");
-    syncReturn(Void.class);
-
-    asyncClientProtocol.getFileInfo(testPath);
-    hdfsFileStatus = syncReturn(HdfsFileStatus.class);
-    assertEquals("tmpUser", hdfsFileStatus.getOwner());
-    assertEquals("tmpUserGroup", hdfsFileStatus.getGroup());
-
-    asyncClientProtocol.create(testPath + "/testCreate.file",
-        new FsPermission(ALL, ALL, ALL), "testAsyncClient",
-        new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)),
-        false, (short) 1, 128 * 1024 * 1024L,
-        new CryptoProtocolVersion[]{ENCRYPTION_ZONES},
-        null, null);
-    hdfsFileStatus = syncReturn(HdfsFileStatus.class);
-    assertTrue(hdfsFileStatus.isFile());
-    assertEquals(128 * 1024 * 1024, hdfsFileStatus.getBlockSize());
-
-    asyncClientProtocol.getFileRemoteLocation(testPath);
-    RemoteLocation remoteLocation = syncReturn(RemoteLocation.class);
-    assertNotNull(remoteLocation);
-    assertEquals(getNs0(), remoteLocation.getNameserviceId());
-    assertEquals(testPath, remoteLocation.getSrc());
-
-    asyncClientProtocol.getListing(testPath, new byte[1], true);
-    DirectoryListing directoryListing = syncReturn(DirectoryListing.class);
-    assertEquals(1, directoryListing.getPartialListing().length);
-
-    asyncClientProtocol.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
-    DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class);
-    assertEquals(3, datanodeInfos.length);
-
-    asyncClientProtocol.createSymlink(testPath + "/testCreate.file",
-        "/link/link.file", new FsPermission(ALL, ALL, ALL), true);
-    syncReturn(Void.class);
-
-    asyncClientProtocol.getFileLinkInfo("/link/link.file");
-    hdfsFileStatus = syncReturn(HdfsFileStatus.class);
-    assertEquals("testCreate.file", hdfsFileStatus.getSymlink().getName());
-
-    asyncClientProtocol.rename(testPath + "/testCreate.file",
-        testPath + "/testRename.file");
-    success = syncReturn(boolean.class);
-    assertTrue(success);
-
-    asyncClientProtocol.delete(testPath, true);
-    success = syncReturn(boolean.class);
-    assertTrue(success);
-  }
-}