Browse Source

HDFS-17656. [ARR] RouterNamenodeProtocol and RouterUserProtocol supports asynchronous rpc. (#7159). Contributed by Jian Zhang.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: Jian Zhang <keepromise@apache.org>
Jian Zhang 5 months ago
parent
commit
33c661bd18

+ 198 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java

@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+/**
+ * Module that implements all the asynchronous RPC calls in {@link NamenodeProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterAsyncNamenodeProtocol extends RouterNamenodeProtocol {
+
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+
+  public RouterAsyncNamenodeProtocol(RouterRpcServer server) {
+    super(server);
+    this.rpcServer = server;
+    this.rpcClient =  this.rpcServer.getRPCClient();
+  }
+
+  /**
+   * Asynchronously get a list of blocks belonging to <code>datanode</code>
+   * whose total size equals <code>size</code>.
+   *
+   * @see org.apache.hadoop.hdfs.server.balancer.Balancer
+   * @param datanode  a data node
+   * @param size      requested size
+   * @param minBlockSize each block should be of this minimum Block Size
+   * @param hotBlockTimeInterval prefer to get blocks which are belong to
+   * the cold files accessed before the time interval
+   * @param storageType the given storage type {@link StorageType}
+   * @return BlocksWithLocations a list of blocks &amp; their locations
+   * @throws IOException if size is less than or equal to 0 or
+  datanode does not exist
+   */
+  @Override
+  public BlocksWithLocations getBlocks(
+      DatanodeInfo datanode, long size,
+      long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    // Get the namespace where the datanode is located
+    rpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
+    asyncApply((AsyncApplyFunction<Map<String, DatanodeStorageReport[]>, Object>) map -> {
+      String nsId = null;
+      for (Map.Entry<String, DatanodeStorageReport[]> entry : map.entrySet()) {
+        DatanodeStorageReport[] dns = entry.getValue();
+        for (DatanodeStorageReport dn : dns) {
+          DatanodeInfo dnInfo = dn.getDatanodeInfo();
+          if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) {
+            nsId = entry.getKey();
+            break;
+          }
+        }
+        // Break the loop if already found
+        if (nsId != null) {
+          break;
+        }
+      }
+      // Forward to the proper namenode
+      if (nsId != null) {
+        RemoteMethod method = new RemoteMethod(
+            NamenodeProtocol.class, "getBlocks", new Class<?>[]
+            {DatanodeInfo.class, long.class, long.class, long.class, StorageType.class},
+            datanode, size, minBlockSize, hotBlockTimeInterval, storageType);
+        rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
+      } else {
+        asyncComplete(null);
+      }
+    });
+    return asyncReturn(BlocksWithLocations.class);
+  }
+
+  /**
+   * Asynchronously get the current block keys.
+   *
+   * @return ExportedBlockKeys containing current block keys
+   * @throws IOException if there is no namespace available or other ioExceptions.
+   */
+  @Override
+  public ExportedBlockKeys getBlockKeys() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
+    rpcServer.invokeAtAvailableNsAsync(method, ExportedBlockKeys.class);
+    return asyncReturn(ExportedBlockKeys.class);
+  }
+
+  /**
+   * Asynchronously get the most recent transaction ID.
+   *
+   * @return The most recent transaction ID that has been synced to
+   * persistent storage, or applied from persistent storage in the
+   * case of a non-active node.
+   * @throws IOException if there is no namespace available or other ioExceptions.
+   */
+  @Override
+  public long getTransactionID() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
+    rpcServer.invokeAtAvailableNsAsync(method, long.class);
+    return asyncReturn(Long.class);
+  }
+
+  /**
+   * Asynchronously get the transaction ID of the most recent checkpoint.
+   *
+   * @return The transaction ID of the most recent checkpoint.
+   * @throws IOException if there is no namespace available or other ioExceptions.
+   */
+  @Override
+  public long getMostRecentCheckpointTxId() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId");
+    rpcServer.invokeAtAvailableNsAsync(method, long.class);
+    return asyncReturn(Long.class);
+  }
+
+  /**
+   * Asynchronously get the transaction ID of the most recent checkpoint
+   * for the given NameNodeFile.
+   *
+   * @return The transaction ID of the most recent checkpoint
+   * for the given NameNodeFile.
+   * @throws IOException if there is no namespace available or other ioExceptions.
+   */
+  @Override
+  public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "getMostRecentNameNodeFileTxId",
+            new Class<?>[] {NNStorage.NameNodeFile.class}, nnf);
+    rpcServer.invokeAtAvailableNsAsync(method, long.class);
+    return asyncReturn(Long.class);
+  }
+
+  /**
+   * Asynchronously request name-node version and storage information.
+   *
+   * @return {@link NamespaceInfo} identifying versions and storage information
+   *          of the name-node.
+   * @throws IOException if there is no namespace available or other ioExceptions.
+   */
+  @Override
+  public NamespaceInfo versionRequest() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "versionRequest");
+    rpcServer.invokeAtAvailableNsAsync(method, NamespaceInfo.class);
+    return asyncReturn(NamespaceInfo.class);
+  }
+}

+ 129 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java

@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterUserProtocol;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+/**
+ * Module that implements all the asynchronous RPC calls in
+ * {@link RefreshUserMappingsProtocol} {@link GetUserMappingsProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterAsyncUserProtocol extends RouterUserProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterAsyncUserProtocol.class);
+
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+
+  private final ActiveNamenodeResolver namenodeResolver;
+
+  public RouterAsyncUserProtocol(RouterRpcServer server) {
+    super(server);
+    this.rpcServer = server;
+    this.rpcClient = this.rpcServer.getRPCClient();
+    this.namenodeResolver = this.rpcServer.getNamenodeResolver();
+  }
+
+  /**
+   * Asynchronously refresh user to group mappings.
+   * @throws IOException  raised on errors performing I/O.
+   */
+  @Override
+  public void refreshUserToGroupsMappings() throws IOException {
+    LOG.debug("Refresh user groups mapping in Router.");
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    if (nss.isEmpty()) {
+      Groups.getUserToGroupsMappingService().refresh();
+      asyncComplete(null);
+    } else {
+      RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
+          "refreshUserToGroupsMappings");
+      rpcClient.invokeConcurrent(nss, method);
+    }
+  }
+
+  /**
+   * Asynchronously refresh superuser proxy group list.
+   * @throws IOException  raised on errors performing I/O.
+   */
+  @Override
+  public void refreshSuperUserGroupsConfiguration() throws IOException {
+    LOG.debug("Refresh superuser groups configuration in Router.");
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    if (nss.isEmpty()) {
+      ProxyUsers.refreshSuperUserGroupsConfiguration();
+      asyncComplete(null);
+    } else {
+      RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
+          "refreshSuperUserGroupsConfiguration");
+      rpcClient.invokeConcurrent(nss, method);
+    }
+  }
+
+  /**
+   * Asynchronously get the groups which are mapped to the given user.
+   * @param user The user to get the groups for.
+   * @return The set of groups the user belongs to.
+   * @throws IOException raised on errors performing I/O.
+   */
+  @Override
+  public String[] getGroupsForUser(String user) throws IOException {
+    LOG.debug("Getting groups for user {}", user);
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    if (nss.isEmpty()) {
+      asyncComplete(UserGroupInformation.createRemoteUser(user)
+          .getGroupNames());
+    } else {
+      RemoteMethod method = new RemoteMethod(GetUserMappingsProtocol.class,
+          "getGroupsForUser", new Class<?>[] {String.class}, user);
+      rpcClient.invokeConcurrent(nss, method, String[].class);
+      asyncApply((ApplyFunction<Map<FederationNamespaceInfo, String[]>, String[]>)
+          results -> merge(results, String.class));
+    }
+    return asyncReturn(String[].class);
+  }
+}

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes that facilitate asynchronous operations within the Hadoop
+ * Distributed File System (HDFS) Federation router. These classes are designed to work with
+ * the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that
+ * can improve the performance and responsiveness of HDFS operations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java

@@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -627,12 +628,15 @@ public class RouterAdminServer extends AbstractService
       Map<RemoteLocation, HdfsFileStatus> responses =
           rpcClient.invokeConcurrent(
               locations, method, false, false, HdfsFileStatus.class);
+      if (rpcServer.isAsync()) {
+        responses = syncReturn(Map.class);
+      }
       for (RemoteLocation location : locations) {
         if (responses.get(location) != null) {
           nsIds.add(location.getNameserviceId());
         }
       }
-    } catch (IOException ioe) {
+    } catch (Exception ioe) {
       LOG.error("Cannot get location for {}: {}",
           src, ioe.getMessage());
     }

+ 12 - 12
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -72,7 +72,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
@@ -398,8 +397,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
         NotReplicatedYetException.class,
         IOException.class,
         ConnectException.class,
-        RetriableException.class,
-        PathIsNotEmptyDirectoryException.class);
+        RetriableException.class);
 
     this.rpcServer.addSuppressedLoggingExceptions(
         StandbyException.class, UnresolvedPathException.class);
@@ -464,7 +462,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   /**
    * Init router async handlers and router async responders.
    */
-  protected void initAsyncThreadPool() {
+  public void initAsyncThreadPool() {
     int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
         DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
     int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
@@ -607,7 +605,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @return routerStateIdContext
    */
   @VisibleForTesting
-  protected RouterStateIdContext getRouterStateIdContext() {
+  public RouterStateIdContext getRouterStateIdContext() {
     return routerStateIdContext;
   }
 
@@ -710,7 +708,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @throws StandbyException If the Router is in safe mode and cannot serve
    *                           client requests.
    */
-  void checkOperation(OperationCategory op)
+  public void checkOperation(OperationCategory op)
       throws StandbyException {
     // Log the function we are currently calling.
     if (rpcMonitor != null) {
@@ -776,8 +774,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * If the namespace is unavailable, retry with other namespaces.
    * @param <T> expected return type.
    * @param method the remote method.
+   * @param clazz the type of return value.
    * @return the response received after invoking method.
-   * @throws IOException
+   * @throws IOException if there is no namespace available or other ioExceptions.
    */
   <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
       throws IOException {
@@ -810,10 +809,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * Asynchronous version of invokeAtAvailableNs method.
    * @param <T> expected return type.
    * @param method the remote method.
+   * @param clazz the type of return value.
    * @return the response received after invoking method.
-   * @throws IOException
+   * @throws IOException if there is no namespace available or other ioExceptions.
    */
-  <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
+  public <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
       throws IOException {
     String nsId = subclusterResolver.getDefaultNamespace();
     // If default Ns is not present return result from first namespace.
@@ -851,7 +851,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @param ioe    IOException .
    * @param nss    List of name spaces in the federation
    * @return the response received after invoking method.
-   * @throws IOException
+   * @throws IOException if there is no namespace available or other ioExceptions.
    */
   <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
       Set<FederationNamespaceInfo> nss) throws IOException {
@@ -885,7 +885,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @param ioe    IOException .
    * @param nss    List of name spaces in the federation
    * @return the response received after invoking method.
-   * @throws IOException
+   * @throws IOException if there is no namespace available or other ioExceptions.
    */
   <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
       Set<FederationNamespaceInfo> nss) throws IOException {
@@ -2131,7 +2131,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @param clazz Class of the values.
    * @return Array with the outputs.
    */
-  static <T> T[] merge(
+  public static <T> T[] merge(
       Map<FederationNamespaceInfo, T[]> map, Class<T> clazz) {
 
     // Put all results into a set to avoid repeats

+ 164 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java

@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterAsyncRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.ipc.CallerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Used to test the functionality of async router rps.
+ */
+public class RouterAsyncProtocolTestBase {
+  private static Configuration routerConf;
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+  private static String ns0;
+
+  /** Random Router for this federated cluster. */
+  private MiniRouterDFSCluster.RouterContext router;
+  private FileSystem routerFs;
+  private RouterRpcServer routerRpcServer;
+  private RouterRpcServer routerAsyncRpcServer;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    cluster = new MiniRouterDFSCluster(true, 1, 2,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+    cluster.setNumDatanodesPerNameservice(3);
+
+    cluster.startCluster();
+
+    // Making one Namenode active per nameservice
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+    // Start routers with only an RPC service
+    routerConf = new RouterConfigBuilder()
+        .rpc()
+        .build();
+
+    // Reduce the number of RPC clients threads to overload the Router easy
+    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(routerConf);
+    // Start routers with only an RPC service
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+    cluster.waitActiveNamespaces();
+    ns0 = cluster.getNameservices().get(0);
+  }
+
+  @AfterClass
+  public static void shutdownCluster() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    router = cluster.getRandomRouter();
+    routerFs = router.getFileSystem();
+    routerRpcServer = router.getRouterRpcServer();
+    routerRpcServer.initAsyncThreadPool();
+    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
+        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+        routerRpcServer.getRPCMonitor(),
+        routerRpcServer.getRouterStateIdContext());
+    routerAsyncRpcServer = Mockito.spy(routerRpcServer);
+    Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
+
+    // Create mock locations
+    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
+    resolver.addLocation("/", ns0, "/");
+    FsPermission permission = new FsPermission("705");
+    routerFs.mkdirs(new Path("/testdir"), permission);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // clear client context
+    CallerContext.setCurrent(null);
+    boolean delete = routerFs.delete(new Path("/testdir"));
+    assertTrue(delete);
+    if (routerFs != null) {
+      routerFs.close();
+    }
+  }
+
+  public static Configuration getRouterConf() {
+    return routerConf;
+  }
+
+  public static MiniRouterDFSCluster getCluster() {
+    return cluster;
+  }
+
+  public static String getNs0() {
+    return ns0;
+  }
+
+  public MiniRouterDFSCluster.RouterContext getRouter() {
+    return router;
+  }
+
+  public FileSystem getRouterFs() {
+    return routerFs;
+  }
+
+  public RouterRpcServer getRouterRpcServer() {
+    return routerRpcServer;
+  }
+
+  public RouterRpcServer getRouterAsyncRpcServer() {
+    return routerAsyncRpcServer;
+  }
+}

+ 126 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java

@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncNamenodeProtocol}.
+ */
+public class TestRouterAsyncNamenodeProtocol extends RouterAsyncProtocolTestBase {
+
+  private RouterAsyncNamenodeProtocol asyncNamenodeProtocol;
+  private RouterNamenodeProtocol namenodeProtocol;
+
+  @Before
+  public void setup() throws Exception {
+    asyncNamenodeProtocol = new RouterAsyncNamenodeProtocol(getRouterAsyncRpcServer());
+    namenodeProtocol = new RouterNamenodeProtocol(getRouterRpcServer());
+  }
+
+  @Test
+  public void getBlocks() throws Exception {
+    DatanodeInfo[] dns = getRouter().getClient()
+        .getNamenode().getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
+
+    DatanodeInfo dn0 = dns[0];
+    asyncNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
+        null);
+    BlocksWithLocations asyncRouterBlockLocations = syncReturn(BlocksWithLocations.class);
+    assertNotNull(asyncRouterBlockLocations);
+
+    BlocksWithLocations syncRouterBlockLocations = namenodeProtocol.getBlocks(dn0, 1024,
+        0, 0, null);
+
+    BlockWithLocations[] asyncRouterBlocks = asyncRouterBlockLocations.getBlocks();
+    BlockWithLocations[] syncRouterBlocks = syncRouterBlockLocations.getBlocks();
+
+    assertEquals(asyncRouterBlocks.length, syncRouterBlocks.length);
+    for (int i = 0; i < syncRouterBlocks.length; i++) {
+      assertEquals(
+          asyncRouterBlocks[i].getBlock().getBlockId(),
+          syncRouterBlocks[i].getBlock().getBlockId());
+    }
+  }
+
+  @Test
+  public void getBlockKeys() throws Exception {
+    asyncNamenodeProtocol.getBlockKeys();
+    ExportedBlockKeys asyncBlockKeys = syncReturn(ExportedBlockKeys.class);
+    assertNotNull(asyncBlockKeys);
+
+    ExportedBlockKeys syncBlockKeys = namenodeProtocol.getBlockKeys();
+    compareBlockKeys(asyncBlockKeys, syncBlockKeys);
+  }
+
+  @Test
+  public void getTransactionID() throws Exception {
+    asyncNamenodeProtocol.getTransactionID();
+    long asyncTransactionID = syncReturn(Long.class);
+    assertNotNull(asyncTransactionID);
+
+    long transactionID = namenodeProtocol.getTransactionID();
+    assertEquals(asyncTransactionID, transactionID);
+  }
+
+  @Test
+  public void getMostRecentCheckpointTxId() throws Exception {
+    asyncNamenodeProtocol.getMostRecentCheckpointTxId();
+    long asyncMostRecentCheckpointTxId = syncReturn(Long.class);
+    assertNotNull(asyncMostRecentCheckpointTxId);
+
+    long mostRecentCheckpointTxId = namenodeProtocol.getMostRecentCheckpointTxId();
+    assertEquals(asyncMostRecentCheckpointTxId, mostRecentCheckpointTxId);
+  }
+
+  @Test
+  public void versionRequest() throws Exception {
+    asyncNamenodeProtocol.versionRequest();
+    NamespaceInfo asyncNamespaceInfo = syncReturn(NamespaceInfo.class);
+    assertNotNull(asyncNamespaceInfo);
+    NamespaceInfo syncNamespaceInfo = namenodeProtocol.versionRequest();
+    compareVersion(asyncNamespaceInfo, syncNamespaceInfo);
+  }
+
+  private void compareBlockKeys(
+      ExportedBlockKeys blockKeys, ExportedBlockKeys otherBlockKeys) {
+    assertEquals(blockKeys.getCurrentKey(), otherBlockKeys.getCurrentKey());
+    assertEquals(blockKeys.getKeyUpdateInterval(), otherBlockKeys.getKeyUpdateInterval());
+    assertEquals(blockKeys.getTokenLifetime(), otherBlockKeys.getTokenLifetime());
+  }
+
+  private void compareVersion(NamespaceInfo version, NamespaceInfo otherVersion) {
+    assertEquals(version.getBlockPoolID(), otherVersion.getBlockPoolID());
+    assertEquals(version.getNamespaceID(), otherVersion.getNamespaceID());
+    assertEquals(version.getClusterID(), otherVersion.getClusterID());
+    assertEquals(version.getLayoutVersion(), otherVersion.getLayoutVersion());
+    assertEquals(version.getCTime(), otherVersion.getCTime());
+  }
+}

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncUserProtocol}.
+ */
+public class TestRouterAsyncUserProtocol extends RouterAsyncProtocolTestBase {
+
+  private RouterAsyncUserProtocol asyncUserProtocol;
+
+  @Before
+  public void setup() throws Exception {
+    asyncUserProtocol = new RouterAsyncUserProtocol(getRouterAsyncRpcServer());
+  }
+
+  @Test
+  public void testgetGroupsForUser() throws Exception {
+    String[] group = new String[] {"bar", "group2"};
+    UserGroupInformation.createUserForTesting("user",
+        new String[] {"bar", "group2"});
+    asyncUserProtocol.getGroupsForUser("user");
+    String[] result = syncReturn(String[].class);
+    assertArrayEquals(group, result);
+  }
+}