Explorar el Código

HDFS-17672. [ARR] Move asynchronous related classes to the async package. (#7184). Contributed by Jian Zhang.

Jian Zhang hace 5 meses
padre
commit
d4a6a27138
Se han modificado 47 ficheros con 649 adiciones y 787 borrados
  1. 4 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
  2. 3 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
  3. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
  5. 9 9
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
  6. 16 15
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  7. 10 3
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
  8. 79 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncErasureCoding.java
  9. 18 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncQuota.java
  10. 38 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncCacheAdmin.java
  11. 5 5
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncNamenodeProtocol.java
  12. 26 16
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
  13. 65 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncSnapshot.java
  14. 29 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncStoragePolicy.java
  15. 8 5
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncUserProtocol.java
  16. 0 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java
  17. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/ApplyFunction.java
  18. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/Async.java
  19. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncApplyFunction.java
  20. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncBiFunction.java
  21. 3 3
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncCatchFunction.java
  22. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncForEachRun.java
  23. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncRun.java
  24. 3 3
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java
  25. 4 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/CatchFunction.java
  26. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/FinallyFunction.java
  27. 5 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/package-info.java
  28. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
  29. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
  30. 0 195
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java
  31. 0 191
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java
  32. 0 163
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java
  33. 1 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
  34. 102 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncCacheAdmin.java
  35. 4 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
  36. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncNamenodeProtocol.java
  37. 4 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
  38. 7 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
  39. 96 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcServer.java
  40. 11 103
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncSnapshot.java
  41. 66 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncStoragePolicy.java
  42. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncUserProtocol.java
  43. 10 10
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncClass.java
  44. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/BaseClass.java
  45. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/SyncClass.java
  46. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/TestAsyncUtil.java
  47. 0 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

@@ -19,7 +19,7 @@
 package org.apache.hadoop.hdfs.protocolPB;
 
 import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine2;
@@ -32,9 +32,9 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
 
 /**

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java

@@ -139,7 +139,7 @@ public class Quota {
    * @return quota usage for each remote location.
    * @throws IOException If the quota system is disabled.
    */
-  Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
+  protected Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
       throws IOException {
     rpcServer.checkOperation(OperationCategory.READ);
     if (!router.isQuotaEnabled()) {
@@ -252,8 +252,9 @@ public class Quota {
    * @param path Federation path of the results.
    * @param results Quota query result.
    * @return Aggregated Quota.
+   * @throws IOException If the quota system is disabled.
    */
-  QuotaUsage aggregateQuota(String path,
+  protected QuotaUsage aggregateQuota(String path,
       Map<RemoteLocation, QuotaUsage> results) throws IOException {
     long nsCount = 0;
     long ssCount = 0;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java

@@ -21,7 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java

@@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 
 /**
  * Service to periodically update the {@link RouterQuotaUsage}

+ 9 - 9
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -1940,7 +1940,7 @@ public class RouterRpcClient {
    * @return A prioritized list of NNs to use for communication.
    * @throws IOException If a NN cannot be located for the nameservice ID.
    */
-  protected List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
+  public List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
       boolean isObserverRead) throws IOException {
     final List<? extends FederationNamenodeContext> namenodes;
 
@@ -2047,39 +2047,39 @@ public class RouterRpcClient {
     private static final byte SHOULD_USE_OBSERVER_BIT = 2;
     private static final byte COMPLETE_BIT = 4;
 
-    ExecutionStatus() {
+    public ExecutionStatus() {
       this(false, false);
     }
 
-    ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
+    public ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
       this.flag = 0;
       setFailOver(failOver);
       setShouldUseObserver(shouldUseObserver);
       setComplete(false);
     }
 
-    private void setFailOver(boolean failOver) {
+    public void setFailOver(boolean failOver) {
       flag = (byte) (failOver ? (flag | FAIL_OVER_BIT) : (flag & ~FAIL_OVER_BIT));
     }
 
-    private void setShouldUseObserver(boolean shouldUseObserver) {
+    public void setShouldUseObserver(boolean shouldUseObserver) {
       flag = (byte) (shouldUseObserver ?
           (flag | SHOULD_USE_OBSERVER_BIT) : (flag & ~SHOULD_USE_OBSERVER_BIT));
     }
 
-    void setComplete(boolean complete) {
+    public void setComplete(boolean complete) {
       flag = (byte) (complete ? (flag | COMPLETE_BIT) : (flag & ~COMPLETE_BIT));
     }
 
-    boolean isFailOver() {
+    public boolean isFailOver() {
       return (flag & FAIL_OVER_BIT) != 0;
     }
 
-    boolean isShouldUseObserver() {
+    public boolean isShouldUseObserver() {
       return (flag &  SHOULD_USE_OBSERVER_BIT) != 0;
     }
 
-    boolean isComplete() {
+    public boolean isComplete() {
       return (flag & COMPLETE_BIT) != 0;
     }
   }

+ 16 - 15
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -37,12 +37,12 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_R
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
 import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
 
 import java.io.FileNotFoundException;
@@ -75,9 +75,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
-import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
-import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
@@ -686,7 +687,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    *                           client requests.
    * @throws UnsupportedOperationException If the operation is not supported.
    */
-  void checkOperation(OperationCategory op, boolean supported)
+  public void checkOperation(OperationCategory op, boolean supported)
       throws StandbyException, UnsupportedOperationException {
     checkOperation(op);
 
@@ -1032,7 +1033,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @return The remote location for this file.
    * @throws IOException If the file has no creation location.
    */
-  RemoteLocation getCreateLocationAsync(
+  public RemoteLocation getCreateLocationAsync(
       final String src, final List<RemoteLocation> locations)
       throws IOException {
 
@@ -1995,7 +1996,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @return Prioritized list of locations in the federated cluster.
    * @throws IOException If the location for this path cannot be determined.
    */
-  protected List<RemoteLocation> getLocationsForPath(String path,
+  public List<RemoteLocation> getLocationsForPath(String path,
       boolean failIfLocked) throws IOException {
     return getLocationsForPath(path, failIfLocked, true);
   }
@@ -2010,7 +2011,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * @return Prioritized list of locations in the federated cluster.
    * @throws IOException If the location for this path cannot be determined.
    */
-  protected List<RemoteLocation> getLocationsForPath(String path,
+  public List<RemoteLocation> getLocationsForPath(String path,
       boolean failIfLocked, boolean needQuotaVerify) throws IOException {
     try {
       if (failIfLocked) {
@@ -2227,9 +2228,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    * mount entry.
    * @param path The path on which the operation need to be invoked.
    * @return true if the call is supposed to invoked on all locations.
-   * @throws IOException
+   * @throws IOException If an I/O error occurs.
    */
-  boolean isInvokeConcurrent(final String path) throws IOException {
+  public boolean isInvokeConcurrent(final String path) throws IOException {
     if (subclusterResolver instanceof MountTableResolver) {
       MountTableResolver mountTableResolver =
           (MountTableResolver) subclusterResolver;

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java

@@ -48,7 +48,7 @@ import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-class RouterStateIdContext implements AlignmentContext {
+public class RouterStateIdContext implements AlignmentContext {
 
   private final HashSet<String> coordinatedMethods;
   /**
@@ -93,6 +93,8 @@ class RouterStateIdContext implements AlignmentContext {
 
   /**
    * Adds the {@link #namespaceIdMap} to the response header that will be sent to a client.
+   *
+   * @param headerBuilder the response header that will be sent to a client.
    */
   public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) {
     if (namespaceIdMap.isEmpty()) {
@@ -110,7 +112,8 @@ class RouterStateIdContext implements AlignmentContext {
   }
 
   public LongAccumulator getNamespaceStateId(String nsId) {
-    return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
+    return namespaceIdMap.computeIfAbsent(nsId,
+        key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
   }
 
   public List<String> getNamespaces() {
@@ -127,6 +130,9 @@ class RouterStateIdContext implements AlignmentContext {
 
   /**
    * Utility function to parse routerFederatedState field in RPC headers.
+   *
+   * @param byteString the byte string of routerFederatedState.
+   * @return the router federated state map.
    */
   public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) {
     if (byteString != null) {
@@ -148,7 +154,8 @@ class RouterStateIdContext implements AlignmentContext {
     if (call != null) {
       ByteString callFederatedNamespaceState = call.getFederatedNamespaceState();
       if (callFederatedNamespaceState != null) {
-        Map<String, Long> clientFederatedStateIds = getRouterFederatedStateMap(callFederatedNamespaceState);
+        Map<String, Long> clientFederatedStateIds =
+            getRouterFederatedStateMap(callFederatedNamespaceState);
         clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE);
       }
     }

+ 79 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncErasureCoding.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
@@ -25,7 +25,12 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.ErasureCoding;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 import java.io.IOException;
@@ -36,9 +41,15 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 
+/**
+ * Provides asynchronous operations for erasure coding in HDFS Federation.
+ * This class extends {@link org.apache.hadoop.hdfs.server.federation.router.ErasureCoding}
+ * and overrides its methods to perform erasure coding operations in a non-blocking manner,
+ * allowing for concurrent execution and improved performance.
+ */
 public class AsyncErasureCoding extends ErasureCoding {
   /** RPC server to receive client calls. */
   private final RouterRpcServer rpcServer;
@@ -54,6 +65,17 @@ public class AsyncErasureCoding extends ErasureCoding {
     this.namenodeResolver = this.rpcClient.getNamenodeResolver();
   }
 
+  /**
+   * Asynchronously get an array of all erasure coding policies.
+   * This method checks the operation category and then invokes the
+   * getErasureCodingPolicies method concurrently across all namespaces.
+   * <p>
+   * The results are merged and returned as an array of ErasureCodingPolicyInfo.
+   *
+   * @return Array of ErasureCodingPolicyInfo.
+   * @throws IOException If an I/O error occurs.
+   */
+  @Override
   public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
       throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -70,6 +92,16 @@ public class AsyncErasureCoding extends ErasureCoding {
     return asyncReturn(ErasureCodingPolicyInfo[].class);
   }
 
+  /**
+   * Asynchronously get the erasure coding codecs available.
+   * This method checks the operation category and then invokes the
+   * getErasureCodingCodecs method concurrently across all namespaces.
+   * <p>
+   * The results are merged into a single map of codec names to codec properties.
+   *
+   * @return Map of erasure coding codecs.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public Map<String, String> getErasureCodingCodecs() throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -97,6 +129,17 @@ public class AsyncErasureCoding extends ErasureCoding {
     return asyncReturn(Map.class);
   }
 
+  /**
+   * Asynchronously add an array of erasure coding policies.
+   * This method checks the operation category and then invokes the
+   * addErasureCodingPolicies method concurrently across all namespaces.
+   * <p>
+   * The results are merged and returned as an array of AddErasureCodingPolicyResponse.
+   *
+   * @param policies Array of erasure coding policies to add.
+   * @return Array of AddErasureCodingPolicyResponse.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
       ErasureCodingPolicy[] policies) throws IOException {
@@ -117,6 +160,17 @@ public class AsyncErasureCoding extends ErasureCoding {
     return asyncReturn(AddErasureCodingPolicyResponse[].class);
   }
 
+  /**
+   * Asynchronously get the erasure coding policy for a given source path.
+   * This method checks the operation category and then invokes the
+   * getErasureCodingPolicy method sequentially for the given path.
+   * <p>
+   * The result is returned as an ErasureCodingPolicy object.
+   *
+   * @param src Source path to get the erasure coding policy for.
+   * @return ErasureCodingPolicy for the given path.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public ErasureCodingPolicy getErasureCodingPolicy(String src)
       throws IOException {
@@ -136,6 +190,17 @@ public class AsyncErasureCoding extends ErasureCoding {
     return asyncReturn(ErasureCodingPolicy.class);
   }
 
+  /**
+   * Asynchronously get the EC topology result for the given policies.
+   * This method checks the operation category and then invokes the
+   * getECTopologyResultForPolicies method concurrently across all namespaces.
+   * <p>
+   * The results are merged and the first unsupported result is returned.
+   *
+   * @param policyNames Array of policy names to check.
+   * @return ECTopologyVerifierResult for the policies.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public ECTopologyVerifierResult getECTopologyResultForPolicies(
       String[] policyNames) throws IOException {
@@ -162,6 +227,16 @@ public class AsyncErasureCoding extends ErasureCoding {
     return asyncReturn(ECTopologyVerifierResult.class);
   }
 
+  /**
+   * Asynchronously get the erasure coding block group statistics.
+   * This method checks the operation category and then invokes the
+   * getECBlockGroupStats method concurrently across all namespaces.
+   * <p>
+   * The results are merged and returned as an ECBlockGroupStats object.
+   *
+   * @return ECBlockGroupStats for the erasure coding block groups.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public ECBlockGroupStats getECBlockGroupStats() throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);

+ 18 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncQuota.java

@@ -15,10 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Quota;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 import java.io.IOException;
@@ -26,9 +32,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletionException;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 
+/**
+ * Provides asynchronous operations for managing quotas in HDFS Federation.
+ * This class extends {@link org.apache.hadoop.hdfs.server.federation.router.Quota}
+ * and overrides its methods to perform quota operations in a non-blocking manner,
+ * allowing for concurrent execution and improved performance.
+ */
 public class AsyncQuota extends Quota {
 
   /** RPC server to receive client calls. */
@@ -50,6 +62,7 @@ public class AsyncQuota extends Quota {
    * @return Aggregated quota.
    * @throws IOException If the quota system is disabled.
    */
+  @Override
   public QuotaUsage getQuotaUsage(String path) throws IOException {
     getEachQuotaUsage(path);
 
@@ -70,7 +83,8 @@ public class AsyncQuota extends Quota {
    * @return quota usage for each remote location.
    * @throws IOException If the quota system is disabled.
    */
-  Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
+  @Override
+  protected Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
       throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);
     if (!router.isQuotaEnabled()) {

+ 38 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncCacheAdmin.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -25,14 +25,16 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.RouterCacheAdmin;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
 
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Map;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 
 /**
  * Module that implements all the asynchronous RPC calls in
@@ -45,6 +47,17 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin {
     super(server);
   }
 
+  /**
+   * Asynchronously adds a new cache directive with the given path and flags.
+   * This method invokes the addCacheDirective method concurrently across all
+   * namespaces, and returns the first response as a long value representing the
+   * directive ID.
+   *
+   * @param path The cache directive path.
+   * @param flags The cache flags.
+   * @return The ID of the newly added cache directive.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public long addCacheDirective(
       CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
@@ -54,6 +67,17 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin {
     return asyncReturn(Long.class);
   }
 
+  /**
+   * Asynchronously lists cache directives based on the provided previous ID and filter.
+   * This method invokes the listCacheDirectives method concurrently across all
+   * namespaces, and returns the first response as a BatchedEntries object containing
+   * the cache directive entries.
+   *
+   * @param prevId The previous ID from which to start listing.
+   * @param filter The filter to apply to the cache directives.
+   * @return BatchedEntries of cache directive entries.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
       long prevId, CacheDirectiveInfo filter) throws IOException {
@@ -64,6 +88,16 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin {
     return asyncReturn(BatchedEntries.class);
   }
 
+  /**
+   * Asynchronously lists cache pools starting from the provided key.
+   * This method invokes the listCachePools method concurrently across all namespaces,
+   * and returns the first response as a BatchedEntries object containing the cache
+   * pool entries.
+   *
+   * @param prevKey The previous key from which to start listing.
+   * @return BatchedEntries of cache pool entries.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws IOException {
     invokeListCachePools(prevKey);

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncNamenodeProtocol.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
 import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
-import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -35,9 +35,9 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 
 /**
  * Module that implements all the asynchronous RPC calls in {@link NamenodeProtocol} in the

+ 26 - 16
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient;
@@ -24,9 +24,19 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
-import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
-import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
+import org.apache.hadoop.hdfs.server.federation.router.ConnectionContext;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdContext;
+import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -50,18 +60,18 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApplyUseExecutor;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.getCompletableFuture;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncThrowException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture;
 
 /**
  * The {@code RouterAsyncRpcClient} class extends the functionality of the base

+ 65 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncSnapshot.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -26,7 +26,13 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterSnapshot;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 import java.io.IOException;
@@ -35,8 +41,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 
 /**
  * Module that implements all the asynchronous RPC calls related to snapshots in
@@ -57,6 +63,16 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
     this.namenodeResolver = rpcServer.getNamenodeResolver();
   }
 
+  /**
+   * Asynchronously creates a snapshot with the given root and name.
+   * This method checks the operation category and then invokes the createSnapshot
+   * method concurrently across all namespaces, returning the first successful response.
+   *
+   * @param snapshotRoot The root path of the snapshot.
+   * @param snapshotName The name of the snapshot.
+   * @return The path of the created snapshot.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public String createSnapshot(String snapshotRoot, String snapshotName) throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
@@ -89,6 +105,15 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
     return asyncReturn(String.class);
   }
 
+  /**
+   * Asynchronously get an array of snapshottable directory listings.
+   * This method checks the operation category and then invokes the
+   * getSnapshottableDirListing method concurrently across all namespaces, merging
+   * the results into a single array.
+   *
+   * @return Array of SnapshottableDirectoryStatus.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -103,6 +128,16 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
     return asyncReturn(SnapshottableDirectoryStatus[].class);
   }
 
+  /**
+   * Asynchronously get an array of snapshot listings for the given snapshot root.
+   * This method checks the operation category and then invokes the
+   * getSnapshotListing method, either sequentially or concurrently based on the
+   * configuration, and returns the merged results.
+   *
+   * @param snapshotRoot The root path of the snapshots.
+   * @return Array of SnapshotStatus.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public SnapshotStatus[] getSnapshotListing(String snapshotRoot) throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -145,6 +180,18 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
     return asyncReturn(SnapshotStatus[].class);
   }
 
+  /**
+   * Asynchronously get a snapshot diff report for the given root and snapshot names.
+   * This method checks the operation category and then invokes the
+   * getSnapshotDiffReport method, either sequentially or concurrently based on the
+   * configuration, and returns the result.
+   *
+   * @param snapshotRoot The root path of the snapshot.
+   * @param earlierSnapshotName The name of the earlier snapshot.
+   * @param laterSnapshotName The name of the later snapshot.
+   * @return SnapshotDiffReport for the snapshots.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public SnapshotDiffReport getSnapshotDiffReport(
       String snapshotRoot, String earlierSnapshotName,
@@ -169,6 +216,20 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
     }
   }
 
+  /**
+   * Asynchronously get a snapshot diff report listing for the given root and snapshot names.
+   * This method checks the operation category and then invokes the
+   * getSnapshotDiffReportListing method, either sequentially or concurrently based
+   * on the configuration, and returns the result.
+   *
+   * @param snapshotRoot The root path of the snapshot.
+   * @param earlierSnapshotName The name of the earlier snapshot.
+   * @param laterSnapshotName The name of the later snapshot.
+   * @param startPath The starting path for the diff report.
+   * @param index The index for the diff report listing.
+   * @return SnapshotDiffReportListing for the snapshots.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public SnapshotDiffReportListing getSnapshotDiffReportListing(
       String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,

+ 29 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncStoragePolicy.java

@@ -15,17 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterStoragePolicy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 import java.io.IOException;
 import java.util.List;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 
+/**
+ * Module that implements all the asynchronous RPC calls in
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to
+ * Storage Policy in the {@link RouterRpcServer}.
+ */
 public class RouterAsyncStoragePolicy extends RouterStoragePolicy {
   /** RPC server to receive client calls. */
   private final RouterRpcServer rpcServer;
@@ -38,6 +48,15 @@ public class RouterAsyncStoragePolicy extends RouterStoragePolicy {
     this.rpcClient = this.rpcServer.getRPCClient();
   }
 
+  /**
+   * Asynchronously get the storage policy for a given path.
+   * This method checks the operation category and then invokes the
+   * getStoragePolicy method sequentially for the given path.
+   *
+   * @param path The path for which to retrieve the storage policy.
+   * @return The BlockStoragePolicy for the given path.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public BlockStoragePolicy getStoragePolicy(String path)
       throws IOException {
@@ -52,6 +71,14 @@ public class RouterAsyncStoragePolicy extends RouterStoragePolicy {
     return asyncReturn(BlockStoragePolicy.class);
   }
 
+  /**
+   * Asynchronously get an array of all available storage policies.
+   * This method checks the operation category and then invokes the
+   * getStoragePolicies method across all available namespaces.
+   *
+   * @return An array of BlockStoragePolicy.
+   * @throws IOException If an I/O error occurs.
+   */
   @Override
   public BlockStoragePolicy[] getStoragePolicies() throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);

+ 8 - 5
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncUserProtocol.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
@@ -38,9 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 
 /**
  * Module that implements all the asynchronous RPC calls in
@@ -67,6 +67,7 @@ public class RouterAsyncUserProtocol extends RouterUserProtocol {
 
   /**
    * Asynchronously refresh user to group mappings.
+   *
    * @throws IOException  raised on errors performing I/O.
    */
   @Override
@@ -86,6 +87,7 @@ public class RouterAsyncUserProtocol extends RouterUserProtocol {
 
   /**
    * Asynchronously refresh superuser proxy group list.
+   *
    * @throws IOException  raised on errors performing I/O.
    */
   @Override
@@ -105,6 +107,7 @@ public class RouterAsyncUserProtocol extends RouterUserProtocol {
 
   /**
    * Asynchronously get the groups which are mapped to the given user.
+   *
    * @param user The user to get the groups for.
    * @return The set of groups the user belongs to.
    * @throws IOException raised on errors performing I/O.

+ 0 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java

@@ -21,10 +21,6 @@
  * Distributed File System (HDFS) Federation router. These classes are designed to work with
  * the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that
  * can improve the performance and responsiveness of HDFS operations.
- *
- * <p>These classes work together to enable complex asynchronous workflows, making it easier to
- * write code that can handle long-running tasks without blocking, thus improving the overall
- * efficiency and scalability of HDFS operations.</p>
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/ApplyFunction.java

@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 
 /**
  * Represents a function that accepts a value of type T and produces a result of type R.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/Async.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncApplyFunction.java

@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 
 /**
  * The AsyncApplyFunction interface represents a function that

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncBiFunction.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncCatchFunction.java

@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException;
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.unWarpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 
 /**
  * The AsyncCatchFunction interface represents a function that handles exceptions

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncForEachRun.java

@@ -15,14 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 
 /**
  * The AsyncForEachRun class is part of the asynchronous operation utilities

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncRun.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -25,8 +25,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.function.Function;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.CUR_COMPLETABLE_FUTURE;
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.CUR_COMPLETABLE_FUTURE;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 
 /**
  * The AsyncUtil class provides a collection of utility methods to simplify

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/CatchFunction.java

@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException;
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.unWarpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 
 /**
  * The {@code CatchFunction} interface represents a function that handles exceptions
@@ -58,7 +58,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCo
  */
 @FunctionalInterface
 public interface CatchFunction<R, E extends Throwable>
-    extends Async<R>{
+    extends Async<R> {
 
   /**
    * Applies this catch function to the given result and exception.

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/FinallyFunction.java

@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 
 /**
  * The {@code FinallyFunction} interface represents a function that is used to perform

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/package-info.java

@@ -21,11 +21,15 @@
  * Distributed File System (HDFS) Federation router. These classes are designed to work with
  * the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that
  * can improve the performance and responsiveness of HDFS operations.
+ *
+ * <p>These classes work together to enable complex asynchronous workflows, making it easier to
+ * write code that can handle long-running tasks without blocking, thus improving the overall
+ * efficiency and scalability of HDFS operations.</p>
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java

@@ -19,7 +19,7 @@
 package org.apache.hadoop.hdfs.protocolPB;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine2;
 import org.apache.hadoop.ipc.RPC;
@@ -42,7 +42,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ForkJoinPool;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java

@@ -62,7 +62,7 @@ import static org.apache.hadoop.fs.permission.FsAction.ALL;
 import static org.apache.hadoop.fs.permission.FsAction.NONE;
 import static org.apache.hadoop.fs.permission.FsAction.READ;
 import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;

+ 0 - 195
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java

@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CacheFlag;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
-import org.apache.hadoop.ipc.CallerContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
-import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestRouterAsyncCacheAdmin {
-  private static Configuration routerConf;
-  /** Federated HDFS cluster. */
-  private static MiniRouterDFSCluster cluster;
-  private static String ns0;
-
-  /** Random Router for this federated cluster. */
-  private MiniRouterDFSCluster.RouterContext router;
-  private FileSystem routerFs;
-  private RouterRpcServer routerRpcServer;
-  private RouterAsyncCacheAdmin asyncCacheAdmin;
-
-  @BeforeClass
-  public static void setUpCluster() throws Exception {
-    cluster = new MiniRouterDFSCluster(true, 1, 2,
-        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
-    cluster.setNumDatanodesPerNameservice(3);
-
-    cluster.startCluster();
-
-    // Making one Namenode active per nameservice
-    if (cluster.isHighAvailability()) {
-      for (String ns : cluster.getNameservices()) {
-        cluster.switchToActive(ns, NAMENODES[0]);
-        cluster.switchToStandby(ns, NAMENODES[1]);
-      }
-    }
-    // Start routers with only an RPC service
-    routerConf = new RouterConfigBuilder()
-        .rpc()
-        .build();
-
-    // Reduce the number of RPC clients threads to overload the Router easy
-    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
-    // We decrease the DN cache times to make the test faster
-    routerConf.setTimeDuration(
-        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
-    cluster.addRouterOverrides(routerConf);
-    // Start routers with only an RPC service
-    cluster.startRouters();
-
-    // Register and verify all NNs with all routers
-    cluster.registerNamenodes();
-    cluster.waitNamenodeRegistration();
-    cluster.waitActiveNamespaces();
-    ns0 = cluster.getNameservices().get(0);
-  }
-
-  @AfterClass
-  public static void shutdownCluster() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    router = cluster.getRandomRouter();
-    routerFs = router.getFileSystem();
-    routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
-    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
-        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
-        routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext());
-    RouterRpcServer spy = Mockito.spy(routerRpcServer);
-    Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
-    asyncCacheAdmin = new RouterAsyncCacheAdmin(spy);
-
-    // Create mock locations
-    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
-    resolver.addLocation("/", ns0, "/");
-    FSDataOutputStream fsDataOutputStream = routerFs.create(
-        new Path("/testCache.file"), true);
-    fsDataOutputStream.write(new byte[1024]);
-    fsDataOutputStream.close();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    // clear client context
-    CallerContext.setCurrent(null);
-    boolean delete = routerFs.delete(new Path("/testCache.file"));
-    assertTrue(delete);
-    if (routerFs != null) {
-      routerFs.close();
-    }
-  }
-
-  @Test
-  public void testRouterAsyncCacheAdmin() throws Exception {
-    asyncCacheAdmin.addCachePool(new CachePoolInfo("pool"));
-    syncReturn(null);
-
-    CacheDirectiveInfo path = new CacheDirectiveInfo.Builder().
-        setPool("pool").
-        setPath(new Path("/testCache.file")).
-        build();
-    asyncCacheAdmin.addCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
-    long result = syncReturn(long.class);
-    assertEquals(1, result);
-
-    asyncCacheAdmin.listCachePools("");
-    BatchedEntries<CachePoolEntry> cachePoolEntries = syncReturn(BatchedEntries.class);
-    assertEquals("pool", cachePoolEntries.get(0).getInfo().getPoolName());
-
-    CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder().
-        setPool("pool").
-        build();
-    asyncCacheAdmin.listCacheDirectives(0, filter);
-    BatchedEntries<CacheDirectiveEntry> cacheDirectiveEntries = syncReturn(BatchedEntries.class);
-    assertEquals(new Path("/testCache.file"), cacheDirectiveEntries.get(0).getInfo().getPath());
-
-    CachePoolInfo pool = new CachePoolInfo("pool").setOwnerName("pool_user");
-    asyncCacheAdmin.modifyCachePool(pool);
-    syncReturn(null);
-
-    asyncCacheAdmin.listCachePools("");
-    cachePoolEntries = syncReturn(BatchedEntries.class);
-    assertEquals("pool_user", cachePoolEntries.get(0).getInfo().getOwnerName());
-
-    path = new CacheDirectiveInfo.Builder().
-        setPool("pool").
-        setPath(new Path("/testCache.file")).
-        setReplication((short) 2).
-        setId(1L).
-        build();
-    asyncCacheAdmin.modifyCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
-    syncReturn(null);
-
-    asyncCacheAdmin.listCacheDirectives(0, filter);
-    cacheDirectiveEntries = syncReturn(BatchedEntries.class);
-    assertEquals(Short.valueOf((short) 2), cacheDirectiveEntries.get(0).getInfo().getReplication());
-
-    asyncCacheAdmin.removeCacheDirective(1L);
-    syncReturn(null);
-    asyncCacheAdmin.removeCachePool("pool");
-    syncReturn(null);
-  }
-}

+ 0 - 191
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java

@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.ipc.CallerContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
-import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Used to test the async functionality of {@link RouterRpcServer}.
- */
-public class TestRouterAsyncRpcServer {
-  private static Configuration routerConf;
-  /** Federated HDFS cluster. */
-  private static MiniRouterDFSCluster cluster;
-  private static String ns0;
-
-  /** Random Router for this federated cluster. */
-  private MiniRouterDFSCluster.RouterContext router;
-  private FileSystem routerFs;
-  private RouterRpcServer asyncRouterRpcServer;
-
-  @BeforeClass
-  public static void setUpCluster() throws Exception {
-    cluster = new MiniRouterDFSCluster(true, 1, 2,
-        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
-    cluster.setNumDatanodesPerNameservice(3);
-
-    cluster.startCluster();
-
-    // Making one Namenode active per nameservice
-    if (cluster.isHighAvailability()) {
-      for (String ns : cluster.getNameservices()) {
-        cluster.switchToActive(ns, NAMENODES[0]);
-        cluster.switchToStandby(ns, NAMENODES[1]);
-      }
-    }
-    // Start routers with only an RPC service
-    routerConf = new RouterConfigBuilder()
-        .rpc()
-        .build();
-
-    // Reduce the number of RPC clients threads to overload the Router easy
-    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
-    // We decrease the DN cache times to make the test faster
-    routerConf.setTimeDuration(
-        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
-    cluster.addRouterOverrides(routerConf);
-    // Start routers with only an RPC service
-    cluster.startRouters();
-
-    // Register and verify all NNs with all routers
-    cluster.registerNamenodes();
-    cluster.waitNamenodeRegistration();
-    cluster.waitActiveNamespaces();
-    ns0 = cluster.getNameservices().get(0);
-  }
-
-  @AfterClass
-  public static void shutdownCluster() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    router = cluster.getRandomRouter();
-    routerFs = router.getFileSystem();
-    RouterRpcServer routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
-    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
-        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
-        routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext());
-    asyncRouterRpcServer = Mockito.spy(routerRpcServer);
-    Mockito.when(asyncRouterRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
-
-    // Create mock locations
-    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
-    resolver.addLocation("/", ns0, "/");
-    FsPermission permission = new FsPermission("705");
-    routerFs.mkdirs(new Path("/testdir"), permission);
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    // clear client context
-    CallerContext.setCurrent(null);
-    boolean delete = routerFs.delete(new Path("/testdir"));
-    assertTrue(delete);
-    if (routerFs != null) {
-      routerFs.close();
-    }
-  }
-
-  /**
-   * Test that the async RPC server can invoke a method at an available Namenode.
-   */
-  @Test
-  public void testInvokeAtAvailableNsAsync() throws Exception {
-    RemoteMethod method = new RemoteMethod("getStoragePolicies");
-    asyncRouterRpcServer.invokeAtAvailableNsAsync(method, BlockStoragePolicy[].class);
-    BlockStoragePolicy[] storagePolicies = syncReturn(BlockStoragePolicy[].class);
-    assertEquals(8, storagePolicies.length);
-  }
-
-  /**
-   * Test get create location async.
-   */
-  @Test
-  public void testGetCreateLocationAsync() throws Exception {
-    final List<RemoteLocation> locations =
-        asyncRouterRpcServer.getLocationsForPath("/testdir", true);
-    asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations);
-    RemoteLocation remoteLocation = syncReturn(RemoteLocation.class);
-    assertNotNull(remoteLocation);
-    assertEquals(ns0, remoteLocation.getNameserviceId());
-  }
-
-  /**
-   * Test get datanode report async.
-   */
-  @Test
-  public void testGetDatanodeReportAsync() throws Exception {
-    asyncRouterRpcServer.getDatanodeReportAsync(
-        HdfsConstants.DatanodeReportType.ALL, true, 0);
-    DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class);
-    assertEquals(3, datanodeInfos.length);
-
-    // Get the namespace where the datanode is located
-    asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
-    Map<String, DatanodeStorageReport[]> map = syncReturn(Map.class);
-    assertEquals(1, map.size());
-    assertEquals(3, map.get(ns0).length);
-
-    DatanodeInfo[] slowDatanodeReport1 =
-        asyncRouterRpcServer.getSlowDatanodeReport(true, 0);
-
-    asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0);
-    DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class);
-    assertEquals(slowDatanodeReport1, slowDatanodeReport2);
-  }
-}

+ 0 - 163
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java

@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.ipc.CallerContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
-import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestRouterAsyncStoragePolicy {
-  private static Configuration routerConf;
-  /** Federated HDFS cluster. */
-  private static MiniRouterDFSCluster cluster;
-  private static String ns0;
-
-  /** Random Router for this federated cluster. */
-  private MiniRouterDFSCluster.RouterContext router;
-  private FileSystem routerFs;
-  private RouterRpcServer routerRpcServer;
-  private RouterAsyncStoragePolicy asyncStoragePolicy;
-
-  private final String testfilePath = "/testdir/testAsyncStoragePolicy.file";
-
-  @BeforeClass
-  public static void setUpCluster() throws Exception {
-    cluster = new MiniRouterDFSCluster(true, 1, 2,
-        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
-    cluster.setNumDatanodesPerNameservice(3);
-
-    cluster.startCluster();
-
-    // Making one Namenode active per nameservice
-    if (cluster.isHighAvailability()) {
-      for (String ns : cluster.getNameservices()) {
-        cluster.switchToActive(ns, NAMENODES[0]);
-        cluster.switchToStandby(ns, NAMENODES[1]);
-      }
-    }
-    // Start routers with only an RPC service
-    routerConf = new RouterConfigBuilder()
-        .rpc()
-        .build();
-
-    // Reduce the number of RPC clients threads to overload the Router easy
-    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
-    // We decrease the DN cache times to make the test faster
-    routerConf.setTimeDuration(
-        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
-    cluster.addRouterOverrides(routerConf);
-    // Start routers with only an RPC service
-    cluster.startRouters();
-
-    // Register and verify all NNs with all routers
-    cluster.registerNamenodes();
-    cluster.waitNamenodeRegistration();
-    cluster.waitActiveNamespaces();
-    ns0 = cluster.getNameservices().get(0);
-  }
-
-  @AfterClass
-  public static void shutdownCluster() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    router = cluster.getRandomRouter();
-    routerFs = router.getFileSystem();
-    routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
-    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
-        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
-        routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext());
-    RouterRpcServer spy = Mockito.spy(routerRpcServer);
-    Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
-    asyncStoragePolicy = new RouterAsyncStoragePolicy(spy);
-
-    // Create mock locations
-    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
-    resolver.addLocation("/", ns0, "/");
-    FsPermission permission = new FsPermission("705");
-    routerFs.mkdirs(new Path("/testdir"), permission);
-    FSDataOutputStream fsDataOutputStream = routerFs.create(
-        new Path(testfilePath), true);
-    fsDataOutputStream.write(new byte[1024]);
-    fsDataOutputStream.close();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    // clear client context
-    CallerContext.setCurrent(null);
-    boolean delete = routerFs.delete(new Path("/testdir"));
-    assertTrue(delete);
-    if (routerFs != null) {
-      routerFs.close();
-    }
-  }
-
-  @Test
-  public void testRouterAsyncStoragePolicy() throws Exception {
-    BlockStoragePolicy[] storagePolicies = cluster.getNamenodes().get(0)
-        .getClient().getStoragePolicies();
-    asyncStoragePolicy.getStoragePolicies();
-    BlockStoragePolicy[] storagePoliciesAsync = syncReturn(BlockStoragePolicy[].class);
-    assertArrayEquals(storagePolicies, storagePoliciesAsync);
-
-    asyncStoragePolicy.getStoragePolicy(testfilePath);
-    BlockStoragePolicy blockStoragePolicy1 = syncReturn(BlockStoragePolicy.class);
-
-    asyncStoragePolicy.setStoragePolicy(testfilePath, "COLD");
-    syncReturn(null);
-    asyncStoragePolicy.getStoragePolicy(testfilePath);
-    BlockStoragePolicy blockStoragePolicy2 = syncReturn(BlockStoragePolicy.class);
-    assertNotEquals(blockStoragePolicy1, blockStoragePolicy2);
-    assertEquals("COLD", blockStoragePolicy2.getName());
-  }
-}

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MockResolver;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
-import org.apache.hadoop.hdfs.server.federation.router.RouterAsyncRpcClient;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.ipc.CallerContext;
 import org.junit.After;

+ 102 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncCacheAdmin.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncCacheAdmin}.
+ */
+public class TestRouterAsyncCacheAdmin extends RouterAsyncProtocolTestBase {
+  private RouterAsyncCacheAdmin asyncCacheAdmin;
+
+  @Before
+  public void setup() throws IOException {
+    asyncCacheAdmin = new RouterAsyncCacheAdmin(getRouterAsyncRpcServer());
+    FSDataOutputStream fsDataOutputStream = getRouterFs().create(
+        new Path("/testCache.file"), true);
+    fsDataOutputStream.write(new byte[1024]);
+    fsDataOutputStream.close();
+  }
+
+  @Test
+  public void testRouterAsyncCacheAdmin() throws Exception {
+    asyncCacheAdmin.addCachePool(new CachePoolInfo("pool"));
+    syncReturn(null);
+
+    CacheDirectiveInfo path = new CacheDirectiveInfo.Builder().
+        setPool("pool").
+        setPath(new Path("/testCache.file")).
+        build();
+    asyncCacheAdmin.addCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
+    long result = syncReturn(long.class);
+    assertEquals(1, result);
+
+    asyncCacheAdmin.listCachePools("");
+    BatchedEntries<CachePoolEntry> cachePoolEntries = syncReturn(BatchedEntries.class);
+    assertEquals("pool", cachePoolEntries.get(0).getInfo().getPoolName());
+
+    CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder().
+        setPool("pool").
+        build();
+    asyncCacheAdmin.listCacheDirectives(0, filter);
+    BatchedEntries<CacheDirectiveEntry> cacheDirectiveEntries = syncReturn(BatchedEntries.class);
+    assertEquals(new Path("/testCache.file"), cacheDirectiveEntries.get(0).getInfo().getPath());
+
+    CachePoolInfo pool = new CachePoolInfo("pool").setOwnerName("pool_user");
+    asyncCacheAdmin.modifyCachePool(pool);
+    syncReturn(null);
+
+    asyncCacheAdmin.listCachePools("");
+    cachePoolEntries = syncReturn(BatchedEntries.class);
+    assertEquals("pool_user", cachePoolEntries.get(0).getInfo().getOwnerName());
+
+    path = new CacheDirectiveInfo.Builder().
+        setPool("pool").
+        setPath(new Path("/testCache.file")).
+        setReplication((short) 2).
+        setId(1L).
+        build();
+    asyncCacheAdmin.modifyCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
+    syncReturn(null);
+
+    asyncCacheAdmin.listCacheDirectives(0, filter);
+    cacheDirectiveEntries = syncReturn(BatchedEntries.class);
+    assertEquals(Short.valueOf((short) 2), cacheDirectiveEntries.get(0).getInfo().getReplication());
+
+    asyncCacheAdmin.removeCacheDirective(1L);
+    syncReturn(null);
+    asyncCacheAdmin.removeCachePool("pool");
+    syncReturn(null);
+  }
+}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MockResolver;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.ipc.CallerContext;
 import org.junit.After;
@@ -49,7 +51,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN
 import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncNamenodeProtocol.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MockResolver;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.ipc.CallerContext;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -44,7 +46,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN
 import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertTrue;
 
 public class TestRouterAsyncQuota {

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -30,6 +30,11 @@ import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RetriableException;
@@ -54,7 +59,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN
 import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;

+ 96 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcServer.java

@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Used to test the async functionality of {@link RouterRpcServer}.
+ */
+public class TestRouterAsyncRpcServer extends RouterAsyncProtocolTestBase {
+  private RouterRpcServer asyncRouterRpcServer;
+
+  @Before
+  public void setup() throws IOException {
+    asyncRouterRpcServer = getRouterAsyncRpcServer();
+  }
+
+  /**
+   * Test that the async RPC server can invoke a method at an available Namenode.
+   */
+  @Test
+  public void testInvokeAtAvailableNsAsync() throws Exception {
+    RemoteMethod method = new RemoteMethod("getStoragePolicies");
+    asyncRouterRpcServer.invokeAtAvailableNsAsync(method, BlockStoragePolicy[].class);
+    BlockStoragePolicy[] storagePolicies = syncReturn(BlockStoragePolicy[].class);
+    assertEquals(8, storagePolicies.length);
+  }
+
+  /**
+   * Test get create location async.
+   */
+  @Test
+  public void testGetCreateLocationAsync() throws Exception {
+    final List<RemoteLocation> locations =
+        asyncRouterRpcServer.getLocationsForPath("/testdir", true);
+    asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations);
+    RemoteLocation remoteLocation = syncReturn(RemoteLocation.class);
+    assertNotNull(remoteLocation);
+    assertEquals(getNs0(), remoteLocation.getNameserviceId());
+  }
+
+  /**
+   * Test get datanode report async.
+   */
+  @Test
+  public void testGetDatanodeReportAsync() throws Exception {
+    asyncRouterRpcServer.getDatanodeReportAsync(
+        HdfsConstants.DatanodeReportType.ALL, true, 0);
+    DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class);
+    assertEquals(3, datanodeInfos.length);
+
+    // Get the namespace where the datanode is located
+    asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
+    Map<String, DatanodeStorageReport[]> map = syncReturn(Map.class);
+    assertEquals(1, map.size());
+    assertEquals(3, map.get(getNs0()).length);
+
+    DatanodeInfo[] slowDatanodeReport1 =
+        asyncRouterRpcServer.getSlowDatanodeReport(true, 0);
+
+    asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0);
+    DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class);
+    assertEquals(slowDatanodeReport1, slowDatanodeReport2);
+  }
+}

+ 11 - 103
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncSnapshot.java

@@ -15,135 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.test.LambdaTestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
-
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY;
-import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
-import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestRouterAsyncSnapshot {
-  private static Configuration routerConf;
-  /** Federated HDFS cluster. */
-  private static MiniRouterDFSCluster cluster;
-  private static String ns0;
 
-  /** Random Router for this federated cluster. */
-  private MiniRouterDFSCluster.RouterContext router;
+/**
+ * Used to test the functionality of {@link RouterAsyncSnapshot}.
+ */
+public class TestRouterAsyncSnapshot extends RouterAsyncProtocolTestBase {
+  private final String testFile = "/testdir/testSnapshot.file";
   private FileSystem routerFs;
-  private RouterRpcServer routerRpcServer;
   private RouterAsyncSnapshot asyncSnapshot;
 
-  @BeforeClass
-  public static void setUpCluster() throws Exception {
-    cluster = new MiniRouterDFSCluster(true, 1, 2,
-        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
-    cluster.setNumDatanodesPerNameservice(3);
-
-    cluster.startCluster();
-
-    // Making one Namenode active per nameservice
-    if (cluster.isHighAvailability()) {
-      for (String ns : cluster.getNameservices()) {
-        cluster.switchToActive(ns, NAMENODES[0]);
-        cluster.switchToStandby(ns, NAMENODES[1]);
-      }
-    }
-    // Start routers with only an RPC service
-    routerConf = new RouterConfigBuilder()
-        .rpc()
-        .build();
-
-    // Reduce the number of RPC clients threads to overload the Router easy
-    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
-    // We decrease the DN cache times to make the test faster
-    routerConf.setTimeDuration(
-        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
-    cluster.addRouterOverrides(routerConf);
-    // Start routers with only an RPC service
-    cluster.startRouters();
-
-    // Register and verify all NNs with all routers
-    cluster.registerNamenodes();
-    cluster.waitNamenodeRegistration();
-    cluster.waitActiveNamespaces();
-    ns0 = cluster.getNameservices().get(0);
-  }
-
-  @AfterClass
-  public static void shutdownCluster() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
   @Before
-  public void setUp() throws IOException {
-    router = cluster.getRandomRouter();
-    routerFs = router.getFileSystem();
-    routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
-    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
-        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
-        routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext());
-    RouterRpcServer spy = Mockito.spy(routerRpcServer);
-    Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
-    asyncSnapshot = new RouterAsyncSnapshot(spy);
-
-    // Create mock locations
-    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
-    resolver.addLocation("/", ns0, "/");
-    FsPermission permission = new FsPermission("705");
-    routerFs.mkdirs(new Path("/testdir"), permission);
+  public void setup() throws IOException {
+    routerFs = getRouterFs();
+    asyncSnapshot = new RouterAsyncSnapshot(getRouterAsyncRpcServer());
     FSDataOutputStream fsDataOutputStream = routerFs.create(
-        new Path("/testdir/testSnapshot.file"), true);
+        new Path(testFile), true);
     fsDataOutputStream.write(new byte[1024]);
     fsDataOutputStream.close();
   }
 
-  @After
-  public void tearDown() throws IOException {
-    // clear client context
-    CallerContext.setCurrent(null);
-    boolean delete = routerFs.delete(new Path("/testdir"));
-    assertTrue(delete);
-    if (routerFs != null) {
-      routerFs.close();
-    }
-  }
-
   @Test
   public void testRouterAsyncSnapshot() throws Exception {
     asyncSnapshot.allowSnapshot("/testdir");

+ 66 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncStoragePolicy.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.IOException;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncStoragePolicy}.
+ */
+public class TestRouterAsyncStoragePolicy extends RouterAsyncProtocolTestBase {
+  private final String testfilePath = "/testdir/testAsyncStoragePolicy.file";
+  private RouterAsyncStoragePolicy asyncStoragePolicy;
+
+  @Before
+  public void setup() throws IOException {
+    asyncStoragePolicy = new RouterAsyncStoragePolicy(getRouterAsyncRpcServer());
+    FSDataOutputStream fsDataOutputStream = getRouterFs().create(
+        new Path(testfilePath), true);
+    fsDataOutputStream.write(new byte[1024]);
+    fsDataOutputStream.close();
+  }
+
+  @Test
+  public void testRouterAsyncStoragePolicy() throws Exception {
+    BlockStoragePolicy[] storagePolicies = getCluster().getNamenodes().get(0)
+        .getClient().getStoragePolicies();
+    asyncStoragePolicy.getStoragePolicies();
+    BlockStoragePolicy[] storagePoliciesAsync = syncReturn(BlockStoragePolicy[].class);
+    assertArrayEquals(storagePolicies, storagePoliciesAsync);
+
+    asyncStoragePolicy.getStoragePolicy(testfilePath);
+    BlockStoragePolicy blockStoragePolicy1 = syncReturn(BlockStoragePolicy.class);
+
+    asyncStoragePolicy.setStoragePolicy(testfilePath, "COLD");
+    syncReturn(null);
+    asyncStoragePolicy.getStoragePolicy(testfilePath);
+    BlockStoragePolicy blockStoragePolicy2 = syncReturn(BlockStoragePolicy.class);
+    assertNotEquals(blockStoragePolicy1, blockStoragePolicy2);
+    assertEquals("COLD", blockStoragePolicy2.getName());
+  }
+}

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncUserProtocol.java

@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 
 /**

+ 10 - 10
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncClass.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,15 +28,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.function.Function;
 
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCurrent;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCurrent;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncThrowException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
 
 /**
  * AsyncClass demonstrates the conversion of synchronous methods

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/BaseClass.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.List;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/SyncClass.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import java.io.IOException;
 import java.util.ArrayList;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java → hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/TestAsyncUtil.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
 
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;

+ 0 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java

@@ -132,9 +132,7 @@ public class TestRouterSecurityManager {
     // Cancel the delegation token
     securityManager.cancelDelegationToken(token);
 
-    String exceptionCause = "Renewal request for unknown token";
     exceptionRule.expect(SecretManager.InvalidToken.class);
-    exceptionRule.expectMessage(exceptionCause);
 
     // This throws an exception as token has been cancelled.
     securityManager.renewDelegationToken(token);