Переглянути джерело

HDFS-17650. [ARR] The router server-side rpc protocol PB supports asynchrony. (#7139). Contributed by hfutatzhanghb.

Co-authored-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
hfutatzhanghb 6 місяців тому
батько
коміт
2678e59c79
20 змінених файлів з 2554 додано та 124 видалено
  1. 54 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
  2. 1769 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java
  3. 59 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolServerSideTranslatorPB.java
  4. 276 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolServerSideTranslatorPB.java
  5. 83 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolServerSideTranslatorPB.java
  6. 5 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
  7. 8 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java
  8. 34 10
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  9. 6 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
  10. 5 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
  11. 5 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java
  12. 18 14
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
  13. 2 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
  14. 2 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
  15. 2 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
  16. 76 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java
  17. 2 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
  18. 66 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java
  19. 78 85
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  20. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java

+ 54 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

@@ -20,16 +20,21 @@ package org.apache.hadoop.hdfs.protocolPB;
 
 import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
 import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine2;
+import org.apache.hadoop.ipc.ProtobufRpcEngineCallback2;
 import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
+import org.apache.hadoop.thirdparty.protobuf.Message;
 import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
@@ -96,6 +101,45 @@ public final class AsyncRpcProtocolPBUtil {
     return asyncReturn(clazz);
   }
 
+  /**
+   * Asynchronously invokes an RPC call and applies a response transformation function
+   * to the result on server-side.
+   * @param req The IPC call encapsulating the RPC request on server-side.
+   * @param res The function to apply to the response of the RPC call on server-side.
+   * @param <T> Type of the call's result.
+   */
+  public static <T> void asyncRouterServer(ServerReq<T> req, ServerRes<T> res) {
+    final ProtobufRpcEngineCallback2 callback =
+        ProtobufRpcEngine2.Server.registerForDeferredResponse2();
+
+    CompletableFuture<Object> completableFuture =
+        CompletableFuture.completedFuture(null);
+    completableFuture.thenCompose(o -> {
+      try {
+        req.req();
+        return (CompletableFuture<T>) AsyncUtil.getAsyncUtilCompletableFuture();
+      } catch (Exception e) {
+        throw new CompletionException(e);
+      }
+    }).handle((result, e) -> {
+      LOG.debug("Async response, callback: {}, CallerContext: {}, result: [{}], exception: [{}]",
+          callback, CallerContext.getCurrent(), result, e);
+      if (e == null) {
+        Message value = null;
+        try {
+          value = res.res(result);
+        } catch (Exception re) {
+          callback.error(re);
+          return null;
+        }
+        callback.setResponse(value);
+      } else {
+        callback.error(e.getCause());
+      }
+      return null;
+    });
+  }
+
   /**
    * Sets the executor used for handling responses asynchronously within
    * the utility class.
@@ -105,4 +149,14 @@ public final class AsyncRpcProtocolPBUtil {
   public static void setWorker(Executor worker) {
     AsyncRpcProtocolPBUtil.worker = worker;
   }
+
+  @FunctionalInterface
+  interface ServerReq<T> {
+    T req() throws Exception;
+  }
+
+  @FunctionalInterface
+  interface ServerRes<T> {
+    Message res(T result) throws Exception;
+  }
 }

+ 1769 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java

@@ -0,0 +1,1769 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.permission.FsCreateModes;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.hdfs.AddBlockFlag;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSnapshotRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSnapshotResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteSnapshotRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteSnapshotResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DisallowSnapshotRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DisallowSnapshotResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupStatsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupStatsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsReplicatedBlockStatsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsReplicatedBlockStatsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLocatedFileInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLocatedFileInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotListingRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotListingResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SatisfyStoragePolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SatisfyStoragePolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.DisableErasureCodingPolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.DisableErasureCodingPolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.EnableErasureCodingPolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.EnableErasureCodingPolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.RemoveErasureCodingPolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.RemoveErasureCodingPolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrResponseProto;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
+import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
+import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
+import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto;
+import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
+import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+import org.apache.hadoop.thirdparty.protobuf.ProtocolStringList;
+import org.apache.hadoop.thirdparty.protobuf.RpcController;
+import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer;
+
+public class RouterClientNamenodeProtocolServerSideTranslatorPB
+    extends ClientNamenodeProtocolServerSideTranslatorPB {
+
+  private final RouterRpcServer server;
+
+  public RouterClientNamenodeProtocolServerSideTranslatorPB(
+      ClientProtocol server) throws IOException {
+    super(server);
+    this.server = (RouterRpcServer) server;
+  }
+
+  @Override
+  public GetBlockLocationsResponseProto getBlockLocations(
+      RpcController controller, GetBlockLocationsRequestProto req) {
+    asyncRouterServer(() -> server.getBlockLocations(req.getSrc(), req.getOffset(),
+        req.getLength()),
+        b -> {
+          GetBlockLocationsResponseProto.Builder builder
+              = GetBlockLocationsResponseProto
+              .newBuilder();
+          if (b != null) {
+            builder.setLocations(PBHelperClient.convert(b)).build();
+          }
+          return builder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public GetServerDefaultsResponseProto getServerDefaults(
+      RpcController controller, GetServerDefaultsRequestProto req) {
+    asyncRouterServer(server::getServerDefaults,
+        result -> GetServerDefaultsResponseProto.newBuilder()
+            .setServerDefaults(PBHelperClient.convert(result))
+            .build());
+    return null;
+  }
+
+
+  @Override
+  public CreateResponseProto create(RpcController controller, CreateRequestProto req) {
+    asyncRouterServer(() -> {
+      FsPermission masked = req.hasUnmasked() ?
+          FsCreateModes.create(PBHelperClient.convert(req.getMasked()),
+              PBHelperClient.convert(req.getUnmasked())) :
+          PBHelperClient.convert(req.getMasked());
+      return server.create(req.getSrc(),
+          masked, req.getClientName(),
+          PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
+          (short) req.getReplication(), req.getBlockSize(),
+          PBHelperClient.convertCryptoProtocolVersions(
+              req.getCryptoProtocolVersionList()),
+          req.getEcPolicyName(), req.getStoragePolicy());
+    }, result -> {
+        if (result != null) {
+          return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result))
+              .build();
+        }
+        return VOID_CREATE_RESPONSE;
+      });
+    return null;
+  }
+
+  @Override
+  public AppendResponseProto append(RpcController controller,
+      AppendRequestProto req) {
+    asyncRouterServer(() -> {
+      EnumSetWritable<CreateFlag> flags = req.hasFlag() ?
+          PBHelperClient.convertCreateFlag(req.getFlag()) :
+          new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND));
+      return server.append(req.getSrc(),
+          req.getClientName(), flags);
+    }, result -> {
+        AppendResponseProto.Builder builder =
+            AppendResponseProto.newBuilder();
+        if (result.getLastBlock() != null) {
+          builder.setBlock(PBHelperClient.convertLocatedBlock(
+              result.getLastBlock()));
+        }
+        if (result.getFileStatus() != null) {
+          builder.setStat(PBHelperClient.convert(result.getFileStatus()));
+        }
+        return builder.build();
+      });
+    return null;
+  }
+
+  @Override
+  public SetReplicationResponseProto setReplication(
+      RpcController controller,
+      SetReplicationRequestProto req) {
+    asyncRouterServer(() ->
+            server.setReplication(req.getSrc(), (short) req.getReplication()),
+        result -> SetReplicationResponseProto.newBuilder().setResult(result).build());
+    return null;
+  }
+
+
+  @Override
+  public SetPermissionResponseProto setPermission(
+      RpcController controller,
+      SetPermissionRequestProto req) {
+    asyncRouterServer(() -> {
+      server.setPermission(req.getSrc(), PBHelperClient.convert(req.getPermission()));
+      return null;
+    }, result -> VOID_SET_PERM_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public SetOwnerResponseProto setOwner(
+      RpcController controller,
+      SetOwnerRequestProto req) {
+    asyncRouterServer(() -> {
+      server.setOwner(req.getSrc(),
+          req.hasUsername() ? req.getUsername() : null,
+          req.hasGroupname() ? req.getGroupname() : null);
+      return null;
+    }, result -> VOID_SET_OWNER_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public AbandonBlockResponseProto abandonBlock(
+      RpcController controller,
+      AbandonBlockRequestProto req) {
+    asyncRouterServer(() -> {
+      server.abandonBlock(PBHelperClient.convert(req.getB()), req.getFileId(),
+          req.getSrc(), req.getHolder());
+      return null;
+    }, result -> VOID_ADD_BLOCK_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public AddBlockResponseProto addBlock(
+      RpcController controller,
+      AddBlockRequestProto req) {
+    asyncRouterServer(() -> {
+      List<HdfsProtos.DatanodeInfoProto> excl = req.getExcludeNodesList();
+      List<String> favor = req.getFavoredNodesList();
+      EnumSet<AddBlockFlag> flags =
+          PBHelperClient.convertAddBlockFlags(req.getFlagsList());
+      return server.addBlock(
+          req.getSrc(),
+          req.getClientName(),
+          req.hasPrevious() ? PBHelperClient.convert(req.getPrevious()) : null,
+          (excl == null || excl.size() == 0) ? null : PBHelperClient.convert(excl
+              .toArray(new HdfsProtos.DatanodeInfoProto[excl.size()])), req.getFileId(),
+          (favor == null || favor.size() == 0) ? null : favor
+              .toArray(new String[favor.size()]),
+          flags);
+    }, result -> AddBlockResponseProto.newBuilder()
+        .setBlock(PBHelperClient.convertLocatedBlock(result)).build());
+    return null;
+  }
+
+  @Override
+  public GetAdditionalDatanodeResponseProto getAdditionalDatanode(
+      RpcController controller, GetAdditionalDatanodeRequestProto req) {
+    asyncRouterServer(() -> {
+      List<HdfsProtos.DatanodeInfoProto> existingList = req.getExistingsList();
+      List<String> existingStorageIDsList = req.getExistingStorageUuidsList();
+      List<HdfsProtos.DatanodeInfoProto> excludesList = req.getExcludesList();
+      LocatedBlock result = server.getAdditionalDatanode(req.getSrc(),
+          req.getFileId(), PBHelperClient.convert(req.getBlk()),
+          PBHelperClient.convert(existingList.toArray(
+              new HdfsProtos.DatanodeInfoProto[existingList.size()])),
+          existingStorageIDsList.toArray(
+              new String[existingStorageIDsList.size()]),
+          PBHelperClient.convert(excludesList.toArray(
+              new HdfsProtos.DatanodeInfoProto[excludesList.size()])),
+          req.getNumAdditionalNodes(), req.getClientName());
+      return result;
+    }, result -> GetAdditionalDatanodeResponseProto.newBuilder()
+        .setBlock(
+            PBHelperClient.convertLocatedBlock(result))
+        .build());
+    return null;
+  }
+
+  @Override
+  public CompleteResponseProto complete(
+      RpcController controller,
+      CompleteRequestProto req) {
+    asyncRouterServer(() -> {
+      boolean result =
+          server.complete(req.getSrc(), req.getClientName(),
+              req.hasLast() ? PBHelperClient.convert(req.getLast()) : null,
+              req.hasFileId() ? req.getFileId() : HdfsConstants.GRANDFATHER_INODE_ID);
+      return result;
+    }, result -> CompleteResponseProto.newBuilder().setResult(result).build());
+    return null;
+  }
+
+  @Override
+  public ReportBadBlocksResponseProto reportBadBlocks(
+      RpcController controller,
+      ReportBadBlocksRequestProto req) {
+    asyncRouterServer(() -> {
+      List<HdfsProtos.LocatedBlockProto> bl = req.getBlocksList();
+      server.reportBadBlocks(PBHelperClient.convertLocatedBlocks(
+          bl.toArray(new HdfsProtos.LocatedBlockProto[bl.size()])));
+      return null;
+    }, result -> VOID_REP_BAD_BLOCK_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public ConcatResponseProto concat(
+      RpcController controller,
+      ConcatRequestProto req) {
+    asyncRouterServer(() -> {
+      List<String> srcs = req.getSrcsList();
+      server.concat(req.getTrg(), srcs.toArray(new String[srcs.size()]));
+      return null;
+    }, result -> VOID_CONCAT_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public RenameResponseProto rename(
+      RpcController controller,
+      RenameRequestProto req) {
+    asyncRouterServer(() -> {
+      return server.rename(req.getSrc(), req.getDst());
+    }, result -> RenameResponseProto.newBuilder().setResult(result).build());
+    return null;
+  }
+
+  @Override
+  public Rename2ResponseProto rename2(
+      RpcController controller,
+      Rename2RequestProto req) {
+    asyncRouterServer(() -> {
+      // resolve rename options
+      ArrayList<Options.Rename> optionList = new ArrayList<Options.Rename>();
+      if (req.getOverwriteDest()) {
+        optionList.add(Options.Rename.OVERWRITE);
+      }
+      if (req.hasMoveToTrash() && req.getMoveToTrash()) {
+        optionList.add(Options.Rename.TO_TRASH);
+      }
+
+      if (optionList.isEmpty()) {
+        optionList.add(Options.Rename.NONE);
+      }
+      server.rename2(req.getSrc(), req.getDst(),
+          optionList.toArray(new Options.Rename[optionList.size()]));
+      return null;
+    }, result -> VOID_RENAME2_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public TruncateResponseProto truncate(
+      RpcController controller,
+      TruncateRequestProto req) {
+    asyncRouterServer(() -> server.truncate(req.getSrc(), req.getNewLength(),
+        req.getClientName()),
+        result -> TruncateResponseProto.newBuilder().setResult(result).build());
+    return null;
+  }
+
+  @Override
+  public DeleteResponseProto delete(
+      RpcController controller,
+      DeleteRequestProto req) {
+    asyncRouterServer(() -> server.delete(req.getSrc(), req.getRecursive()),
+        result -> DeleteResponseProto.newBuilder().setResult(result).build());
+    return null;
+  }
+
+  @Override
+  public MkdirsResponseProto mkdirs(
+      RpcController controller,
+      MkdirsRequestProto req) {
+    asyncRouterServer(() -> {
+      FsPermission masked = req.hasUnmasked() ?
+          FsCreateModes.create(PBHelperClient.convert(req.getMasked()),
+              PBHelperClient.convert(req.getUnmasked())) :
+          PBHelperClient.convert(req.getMasked());
+      boolean result = server.mkdirs(req.getSrc(), masked,
+          req.getCreateParent());
+      return result;
+    }, result -> MkdirsResponseProto.newBuilder().setResult(result).build());
+    return null;
+  }
+
+  @Override
+  public GetListingResponseProto getListing(
+      RpcController controller,
+      GetListingRequestProto req) {
+    asyncRouterServer(() -> {
+      DirectoryListing result = server.getListing(
+          req.getSrc(), req.getStartAfter().toByteArray(),
+          req.getNeedLocation());
+      return result;
+    }, result -> {
+        if (result != null) {
+          return GetListingResponseProto.newBuilder().setDirList(
+                  PBHelperClient.convert(result)).build();
+        } else {
+          return VOID_GETLISTING_RESPONSE;
+        }
+      });
+    return null;
+  }
+
+  @Override
+  public GetBatchedListingResponseProto getBatchedListing(
+      RpcController controller,
+      GetBatchedListingRequestProto request) {
+    asyncRouterServer(() -> {
+      BatchedDirectoryListing result = server.getBatchedListing(
+          request.getPathsList().toArray(new String[]{}),
+          request.getStartAfter().toByteArray(),
+          request.getNeedLocation());
+      return result;
+    }, result -> {
+        if (result != null) {
+          GetBatchedListingResponseProto.Builder builder =
+              GetBatchedListingResponseProto.newBuilder();
+          for (HdfsPartialListing partialListing : result.getListings()) {
+            HdfsProtos.BatchedDirectoryListingProto.Builder listingBuilder =
+                HdfsProtos.BatchedDirectoryListingProto.newBuilder();
+            if (partialListing.getException() != null) {
+              RemoteException ex = partialListing.getException();
+              HdfsProtos.RemoteExceptionProto.Builder rexBuilder =
+                  HdfsProtos.RemoteExceptionProto.newBuilder();
+              rexBuilder.setClassName(ex.getClassName());
+              if (ex.getMessage() != null) {
+                rexBuilder.setMessage(ex.getMessage());
+              }
+              listingBuilder.setException(rexBuilder.build());
+            } else {
+              for (HdfsFileStatus f : partialListing.getPartialListing()) {
+                listingBuilder.addPartialListing(PBHelperClient.convert(f));
+              }
+            }
+            listingBuilder.setParentIdx(partialListing.getParentIdx());
+            builder.addListings(listingBuilder);
+          }
+          builder.setHasMore(result.hasMore());
+          builder.setStartAfter(ByteString.copyFrom(result.getStartAfter()));
+          return builder.build();
+        } else {
+          return VOID_GETBATCHEDLISTING_RESPONSE;
+        }
+      });
+    return null;
+  }
+
+  @Override
+  public RenewLeaseResponseProto renewLease(
+      RpcController controller,
+      RenewLeaseRequestProto req) {
+    asyncRouterServer(() -> {
+      server.renewLease(req.getClientName(), req.getNamespacesList());
+      return null;
+    }, result -> VOID_RENEWLEASE_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public RecoverLeaseResponseProto recoverLease(
+      RpcController controller,
+      RecoverLeaseRequestProto req) {
+    asyncRouterServer(() -> server.recoverLease(req.getSrc(), req.getClientName()),
+        result -> RecoverLeaseResponseProto.newBuilder()
+            .setResult(result).build());
+    return null;
+  }
+
+  @Override
+  public RestoreFailedStorageResponseProto restoreFailedStorage(
+      RpcController controller, RestoreFailedStorageRequestProto req) {
+    asyncRouterServer(() -> server.restoreFailedStorage(req.getArg()),
+        result -> RestoreFailedStorageResponseProto.newBuilder().setResult(result)
+            .build());
+    return null;
+  }
+
+  @Override
+  public GetFsStatsResponseProto getFsStats(
+      RpcController controller,
+      GetFsStatusRequestProto req) {
+    asyncRouterServer(server::getStats, PBHelperClient::convert);
+    return null;
+  }
+
+  @Override
+  public GetFsReplicatedBlockStatsResponseProto getFsReplicatedBlockStats(
+      RpcController controller, GetFsReplicatedBlockStatsRequestProto request) {
+    asyncRouterServer(server::getReplicatedBlockStats, PBHelperClient::convert);
+    return null;
+  }
+
+  @Override
+  public GetFsECBlockGroupStatsResponseProto getFsECBlockGroupStats(
+      RpcController controller, GetFsECBlockGroupStatsRequestProto request) {
+    asyncRouterServer(server::getECBlockGroupStats, PBHelperClient::convert);
+    return null;
+  }
+
+  @Override
+  public GetDatanodeReportResponseProto getDatanodeReport(
+      RpcController controller, GetDatanodeReportRequestProto req) {
+    asyncRouterServer(() -> server.getDatanodeReport(PBHelperClient.convert(req.getType())),
+        result -> {
+          List<? extends HdfsProtos.DatanodeInfoProto> re = PBHelperClient.convert(result);
+          return GetDatanodeReportResponseProto.newBuilder()
+              .addAllDi(re).build();
+        });
+    return null;
+  }
+
+  @Override
+  public GetDatanodeStorageReportResponseProto getDatanodeStorageReport(
+      RpcController controller, GetDatanodeStorageReportRequestProto req) {
+    asyncRouterServer(() -> server.getDatanodeStorageReport(PBHelperClient.convert(req.getType())),
+        result -> {
+          List<ClientNamenodeProtocolProtos.DatanodeStorageReportProto> reports =
+              PBHelperClient.convertDatanodeStorageReports(result);
+          return GetDatanodeStorageReportResponseProto.newBuilder()
+              .addAllDatanodeStorageReports(reports)
+              .build();
+        });
+    return null;
+  }
+
+  @Override
+  public GetPreferredBlockSizeResponseProto getPreferredBlockSize(
+      RpcController controller, GetPreferredBlockSizeRequestProto req) {
+    asyncRouterServer(() -> server.getPreferredBlockSize(req.getFilename()),
+        result -> GetPreferredBlockSizeResponseProto.newBuilder().setBsize(result)
+            .build());
+    return null;
+  }
+
+  @Override
+  public SetSafeModeResponseProto setSafeMode(
+      RpcController controller,
+      SetSafeModeRequestProto req) {
+    asyncRouterServer(() -> server.setSafeMode(PBHelperClient.convert(req.getAction()),
+        req.getChecked()),
+        result -> SetSafeModeResponseProto.newBuilder().setResult(result).build());
+    return null;
+  }
+
+  @Override
+  public SaveNamespaceResponseProto saveNamespace(
+      RpcController controller,
+      SaveNamespaceRequestProto req) {
+    asyncRouterServer(() -> {
+      final long timeWindow = req.hasTimeWindow() ? req.getTimeWindow() : 0;
+      final long txGap = req.hasTxGap() ? req.getTxGap() : 0;
+      return server.saveNamespace(timeWindow, txGap);
+    }, result -> SaveNamespaceResponseProto.newBuilder().setSaved(result).build());
+    return null;
+  }
+
+  @Override
+  public RollEditsResponseProto rollEdits(
+      RpcController controller,
+      RollEditsRequestProto request) {
+    asyncRouterServer(server::rollEdits,
+        txid -> RollEditsResponseProto.newBuilder()
+            .setNewSegmentTxId(txid)
+            .build());
+    return null;
+  }
+
+
+  @Override
+  public RefreshNodesResponseProto refreshNodes(
+      RpcController controller,
+      RefreshNodesRequestProto req) {
+    asyncRouterServer(() -> {
+      server.refreshNodes();
+      return null;
+    }, result -> VOID_REFRESHNODES_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public FinalizeUpgradeResponseProto finalizeUpgrade(
+      RpcController controller,
+      FinalizeUpgradeRequestProto req) {
+    asyncRouterServer(() -> {
+      server.finalizeUpgrade();
+      return null;
+    }, result -> VOID_REFRESHNODES_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public UpgradeStatusResponseProto upgradeStatus(
+      RpcController controller, UpgradeStatusRequestProto req) {
+    asyncRouterServer(server::upgradeStatus,
+        result -> {
+          UpgradeStatusResponseProto.Builder b =
+              UpgradeStatusResponseProto.newBuilder();
+          b.setUpgradeFinalized(result);
+          return b.build();
+        });
+    return null;
+  }
+
+  @Override
+  public RollingUpgradeResponseProto rollingUpgrade(
+      RpcController controller,
+      RollingUpgradeRequestProto req) {
+    asyncRouterServer(() ->
+            server.rollingUpgrade(PBHelperClient.convert(req.getAction())),
+        info -> {
+          final RollingUpgradeResponseProto.Builder b =
+              RollingUpgradeResponseProto.newBuilder();
+          if (info != null) {
+            b.setRollingUpgradeInfo(PBHelperClient.convert(info));
+          }
+          return b.build();
+        });
+    return null;
+  }
+
+  @Override
+  public ListCorruptFileBlocksResponseProto listCorruptFileBlocks(
+      RpcController controller, ListCorruptFileBlocksRequestProto req) {
+    asyncRouterServer(() -> server.listCorruptFileBlocks(
+        req.getPath(), req.hasCookie() ? req.getCookie(): null),
+        result -> ListCorruptFileBlocksResponseProto.newBuilder()
+            .setCorrupt(PBHelperClient.convert(result))
+            .build());
+    return null;
+  }
+
+  @Override
+  public MetaSaveResponseProto metaSave(
+      RpcController controller,
+      MetaSaveRequestProto req) {
+    asyncRouterServer(() -> {
+      server.metaSave(req.getFilename());
+      return null;
+    }, result -> VOID_METASAVE_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetFileInfoResponseProto getFileInfo(
+      RpcController controller,
+      GetFileInfoRequestProto req) {
+    asyncRouterServer(() -> server.getFileInfo(req.getSrc()),
+        result -> {
+          if (result != null) {
+            return GetFileInfoResponseProto.newBuilder().setFs(
+                PBHelperClient.convert(result)).build();
+          }
+          return VOID_GETFILEINFO_RESPONSE;
+        });
+    return null;
+  }
+
+  @Override
+  public GetLocatedFileInfoResponseProto getLocatedFileInfo(
+      RpcController controller, GetLocatedFileInfoRequestProto req) {
+    asyncRouterServer(() -> server.getLocatedFileInfo(req.getSrc(),
+        req.getNeedBlockToken()),
+        result -> {
+          if (result != null) {
+            return GetLocatedFileInfoResponseProto.newBuilder().setFs(
+                PBHelperClient.convert(result)).build();
+          }
+          return VOID_GETLOCATEDFILEINFO_RESPONSE;
+        });
+    return null;
+  }
+
+  @Override
+  public GetFileLinkInfoResponseProto getFileLinkInfo(
+      RpcController controller,
+      GetFileLinkInfoRequestProto req) {
+    asyncRouterServer(() -> server.getFileLinkInfo(req.getSrc()),
+        result -> {
+          if (result != null) {
+            return GetFileLinkInfoResponseProto.newBuilder().setFs(
+                PBHelperClient.convert(result)).build();
+          } else {
+            return VOID_GETFILELINKINFO_RESPONSE;
+          }
+        });
+    return null;
+  }
+
+  @Override
+  public GetContentSummaryResponseProto getContentSummary(
+      RpcController controller, GetContentSummaryRequestProto req) {
+    asyncRouterServer(() -> server.getContentSummary(req.getPath()),
+        result -> GetContentSummaryResponseProto.newBuilder()
+            .setSummary(PBHelperClient.convert(result)).build());
+    return null;
+  }
+
+  @Override
+  public SetQuotaResponseProto setQuota(
+      RpcController controller,
+      SetQuotaRequestProto req) {
+    asyncRouterServer(() -> {
+      server.setQuota(req.getPath(), req.getNamespaceQuota(),
+          req.getStoragespaceQuota(),
+          req.hasStorageType() ?
+              PBHelperClient.convertStorageType(req.getStorageType()): null);
+      return null;
+    }, result -> VOID_SETQUOTA_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public FsyncResponseProto fsync(
+      RpcController controller,
+      FsyncRequestProto req) {
+    asyncRouterServer(() -> {
+      server.fsync(req.getSrc(), req.getFileId(),
+          req.getClient(), req.getLastBlockLength());
+      return null;
+    }, result -> VOID_FSYNC_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public SetTimesResponseProto setTimes(
+      RpcController controller,
+      SetTimesRequestProto req) {
+    asyncRouterServer(() -> {
+      server.setTimes(req.getSrc(), req.getMtime(), req.getAtime());
+      return null;
+    }, result -> VOID_SETTIMES_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public CreateSymlinkResponseProto createSymlink(
+      RpcController controller,
+      CreateSymlinkRequestProto req) {
+    asyncRouterServer(() -> {
+      server.createSymlink(req.getTarget(), req.getLink(),
+          PBHelperClient.convert(req.getDirPerm()), req.getCreateParent());
+      return null;
+    }, result -> VOID_CREATESYMLINK_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetLinkTargetResponseProto getLinkTarget(
+      RpcController controller,
+      GetLinkTargetRequestProto req) {
+    asyncRouterServer(() -> server.getLinkTarget(req.getPath()),
+        result -> {
+          GetLinkTargetResponseProto.Builder builder =
+              GetLinkTargetResponseProto
+                  .newBuilder();
+          if (result != null) {
+            builder.setTargetPath(result);
+          }
+          return builder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public UpdateBlockForPipelineResponseProto updateBlockForPipeline(
+      RpcController controller, UpdateBlockForPipelineRequestProto req) {
+    asyncRouterServer(() -> server.updateBlockForPipeline(PBHelperClient.convert(req.getBlock()),
+        req.getClientName()),
+        result -> {
+          HdfsProtos.LocatedBlockProto res = PBHelperClient.convertLocatedBlock(result);
+          return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(res)
+              .build();
+        });
+    return null;
+  }
+
+  @Override
+  public UpdatePipelineResponseProto updatePipeline(
+      RpcController controller,
+      UpdatePipelineRequestProto req) {
+    asyncRouterServer(() -> {
+      List<HdfsProtos.DatanodeIDProto> newNodes = req.getNewNodesList();
+      List<String> newStorageIDs = req.getStorageIDsList();
+      server.updatePipeline(req.getClientName(),
+          PBHelperClient.convert(req.getOldBlock()),
+          PBHelperClient.convert(req.getNewBlock()),
+          PBHelperClient.convert(newNodes.toArray(new HdfsProtos.DatanodeIDProto[newNodes.size()])),
+          newStorageIDs.toArray(new String[newStorageIDs.size()]));
+      return null;
+    }, result -> VOID_UPDATEPIPELINE_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetDelegationTokenResponseProto getDelegationToken(
+      RpcController controller, GetDelegationTokenRequestProto req) {
+    asyncRouterServer(() -> server
+            .getDelegationToken(new Text(req.getRenewer())),
+        token -> {
+          GetDelegationTokenResponseProto.Builder rspBuilder =
+              GetDelegationTokenResponseProto.newBuilder();
+          if (token != null) {
+            rspBuilder.setToken(PBHelperClient.convert(token));
+          }
+          return rspBuilder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public RenewDelegationTokenResponseProto renewDelegationToken(
+      RpcController controller, RenewDelegationTokenRequestProto req) {
+    asyncRouterServer(() -> server.renewDelegationToken(PBHelperClient
+            .convertDelegationToken(req.getToken())),
+        result -> RenewDelegationTokenResponseProto.newBuilder()
+            .setNewExpiryTime(result).build());
+    return null;
+  }
+
+  @Override
+  public CancelDelegationTokenResponseProto cancelDelegationToken(
+      RpcController controller, CancelDelegationTokenRequestProto req) {
+    asyncRouterServer(() -> {
+      server.cancelDelegationToken(PBHelperClient.convertDelegationToken(req
+          .getToken()));
+      return null;
+    }, result -> VOID_CANCELDELEGATIONTOKEN_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public SetBalancerBandwidthResponseProto setBalancerBandwidth(
+      RpcController controller, SetBalancerBandwidthRequestProto req) {
+    asyncRouterServer(() -> {
+      server.setBalancerBandwidth(req.getBandwidth());
+      return null;
+    }, result -> VOID_SETBALANCERBANDWIDTH_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
+      RpcController controller, GetDataEncryptionKeyRequestProto request) {
+    asyncRouterServer(server::getDataEncryptionKey, encryptionKey -> {
+      GetDataEncryptionKeyResponseProto.Builder builder =
+          GetDataEncryptionKeyResponseProto.newBuilder();
+      if (encryptionKey != null) {
+        builder.setDataEncryptionKey(PBHelperClient.convert(encryptionKey));
+      }
+      return builder.build();
+    });
+    return null;
+  }
+
+  @Override
+  public CreateSnapshotResponseProto createSnapshot(
+      RpcController controller,
+      CreateSnapshotRequestProto req) throws ServiceException {
+    asyncRouterServer(() -> server.createSnapshot(req.getSnapshotRoot(),
+        req.hasSnapshotName()? req.getSnapshotName(): null),
+        snapshotPath -> {
+          final CreateSnapshotResponseProto.Builder builder
+              = CreateSnapshotResponseProto.newBuilder();
+          if (snapshotPath != null) {
+            builder.setSnapshotPath(snapshotPath);
+          }
+          return builder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public DeleteSnapshotResponseProto deleteSnapshot(
+      RpcController controller,
+      DeleteSnapshotRequestProto req) {
+    asyncRouterServer(() -> {
+      server.deleteSnapshot(req.getSnapshotRoot(), req.getSnapshotName());
+      return null;
+    }, result -> VOID_DELETE_SNAPSHOT_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public AllowSnapshotResponseProto allowSnapshot(
+      RpcController controller,
+      AllowSnapshotRequestProto req) {
+    asyncRouterServer(() -> {
+      server.allowSnapshot(req.getSnapshotRoot());
+      return null;
+    }, result -> VOID_ALLOW_SNAPSHOT_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public DisallowSnapshotResponseProto disallowSnapshot(
+      RpcController controller,
+      DisallowSnapshotRequestProto req) {
+    asyncRouterServer(() -> {
+      server.disallowSnapshot(req.getSnapshotRoot());
+      return null;
+    }, result -> VOID_DISALLOW_SNAPSHOT_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public RenameSnapshotResponseProto renameSnapshot(
+      RpcController controller,
+      RenameSnapshotRequestProto request) {
+    asyncRouterServer(() -> {
+      server.renameSnapshot(request.getSnapshotRoot(),
+          request.getSnapshotOldName(), request.getSnapshotNewName());
+      return null;
+    }, result -> VOID_RENAME_SNAPSHOT_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetSnapshottableDirListingResponseProto getSnapshottableDirListing(
+      RpcController controller, GetSnapshottableDirListingRequestProto request) {
+    asyncRouterServer(server::getSnapshottableDirListing,
+        result -> {
+          if (result != null) {
+            return GetSnapshottableDirListingResponseProto.newBuilder().
+                setSnapshottableDirList(PBHelperClient.convert(result)).build();
+          } else {
+            return NULL_GET_SNAPSHOTTABLE_DIR_LISTING_RESPONSE;
+          }
+        });
+    return null;
+  }
+
+  @Override
+  public GetSnapshotListingResponseProto getSnapshotListing(
+      RpcController controller, GetSnapshotListingRequestProto request) {
+    asyncRouterServer(() -> server
+            .getSnapshotListing(request.getSnapshotRoot()),
+        result -> {
+          if (result != null) {
+            return GetSnapshotListingResponseProto.newBuilder().
+                setSnapshotList(PBHelperClient.convert(result)).build();
+          } else {
+            return NULL_GET_SNAPSHOT_LISTING_RESPONSE;
+          }
+        });
+    return null;
+  }
+
+  @Override
+  public GetSnapshotDiffReportResponseProto getSnapshotDiffReport(
+      RpcController controller, GetSnapshotDiffReportRequestProto request) {
+    asyncRouterServer(() -> server.getSnapshotDiffReport(
+        request.getSnapshotRoot(), request.getFromSnapshot(),
+        request.getToSnapshot()),
+        report -> GetSnapshotDiffReportResponseProto.newBuilder()
+            .setDiffReport(PBHelperClient.convert(report)).build());
+    return null;
+  }
+
+  @Override
+  public GetSnapshotDiffReportListingResponseProto getSnapshotDiffReportListing(
+      RpcController controller,
+      GetSnapshotDiffReportListingRequestProto request) {
+    asyncRouterServer(() -> server
+            .getSnapshotDiffReportListing(request.getSnapshotRoot(),
+                request.getFromSnapshot(), request.getToSnapshot(),
+                request.getCursor().getStartPath().toByteArray(),
+                request.getCursor().getIndex()),
+        report -> GetSnapshotDiffReportListingResponseProto.newBuilder()
+            .setDiffReport(PBHelperClient.convert(report)).build());
+    return null;
+  }
+
+  @Override
+  public IsFileClosedResponseProto isFileClosed(
+      RpcController controller, IsFileClosedRequestProto request) {
+    asyncRouterServer(() -> server.isFileClosed(request.getSrc()),
+        result -> IsFileClosedResponseProto.newBuilder()
+            .setResult(result).build());
+    return null;
+  }
+
+  @Override
+  public AddCacheDirectiveResponseProto addCacheDirective(
+      RpcController controller, AddCacheDirectiveRequestProto request) {
+    asyncRouterServer(() -> server.addCacheDirective(
+        PBHelperClient.convert(request.getInfo()),
+        PBHelperClient.convertCacheFlags(request.getCacheFlags())),
+        id -> AddCacheDirectiveResponseProto.newBuilder().
+            setId(id).build());
+    return null;
+  }
+
+  @Override
+  public ModifyCacheDirectiveResponseProto modifyCacheDirective(
+      RpcController controller, ModifyCacheDirectiveRequestProto request) {
+    asyncRouterServer(() -> {
+      server.modifyCacheDirective(
+          PBHelperClient.convert(request.getInfo()),
+          PBHelperClient.convertCacheFlags(request.getCacheFlags()));
+      return null;
+    }, result -> ModifyCacheDirectiveResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public RemoveCacheDirectiveResponseProto removeCacheDirective(
+      RpcController controller,
+      RemoveCacheDirectiveRequestProto request) {
+    asyncRouterServer(() -> {
+      server.removeCacheDirective(request.getId());
+      return null;
+    }, result -> RemoveCacheDirectiveResponseProto.
+        newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public ListCacheDirectivesResponseProto listCacheDirectives(
+      RpcController controller, ListCacheDirectivesRequestProto request) {
+    asyncRouterServer(() -> {
+      CacheDirectiveInfo filter =
+          PBHelperClient.convert(request.getFilter());
+      return  server.listCacheDirectives(request.getPrevId(), filter);
+    }, entries -> {
+        ListCacheDirectivesResponseProto.Builder builder =
+            ListCacheDirectivesResponseProto.newBuilder();
+        builder.setHasMore(entries.hasMore());
+        for (int i=0, n=entries.size(); i<n; i++) {
+          builder.addElements(PBHelperClient.convert(entries.get(i)));
+        }
+        return builder.build();
+      });
+    return null;
+  }
+
+  @Override
+  public AddCachePoolResponseProto addCachePool(
+      RpcController controller,
+      AddCachePoolRequestProto request) {
+    asyncRouterServer(() -> {
+      server.addCachePool(PBHelperClient.convert(request.getInfo()));
+      return null;
+    }, result -> AddCachePoolResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public ModifyCachePoolResponseProto modifyCachePool(
+      RpcController controller,
+      ModifyCachePoolRequestProto request) {
+    asyncRouterServer(() -> {
+      server.modifyCachePool(PBHelperClient.convert(request.getInfo()));
+      return null;
+    }, result -> ModifyCachePoolResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public RemoveCachePoolResponseProto removeCachePool(
+      RpcController controller,
+      RemoveCachePoolRequestProto request) {
+    asyncRouterServer(() -> {
+      server.removeCachePool(request.getPoolName());
+      return null;
+    }, result -> RemoveCachePoolResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public ListCachePoolsResponseProto listCachePools(
+      RpcController controller,
+      ListCachePoolsRequestProto request) {
+    asyncRouterServer(() -> server.listCachePools(request.getPrevPoolName()),
+        entries -> {
+          ListCachePoolsResponseProto.Builder responseBuilder =
+              ListCachePoolsResponseProto.newBuilder();
+          responseBuilder.setHasMore(entries.hasMore());
+          for (int i=0, n=entries.size(); i<n; i++) {
+            responseBuilder.addEntries(PBHelperClient.convert(entries.get(i)));
+          }
+          return responseBuilder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public ModifyAclEntriesResponseProto modifyAclEntries(
+      RpcController controller, ModifyAclEntriesRequestProto req) {
+    asyncRouterServer(() -> {
+      server.modifyAclEntries(req.getSrc(), PBHelperClient.convertAclEntry(req.getAclSpecList()));
+      return null;
+    }, vo -> VOID_MODIFYACLENTRIES_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public RemoveAclEntriesResponseProto removeAclEntries(
+      RpcController controller, RemoveAclEntriesRequestProto req) {
+    asyncRouterServer(() -> {
+      server.removeAclEntries(req.getSrc(),
+          PBHelperClient.convertAclEntry(req.getAclSpecList()));
+      return null;
+    }, vo -> VOID_REMOVEACLENTRIES_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public RemoveDefaultAclResponseProto removeDefaultAcl(
+      RpcController controller, RemoveDefaultAclRequestProto req) {
+    asyncRouterServer(() -> {
+      server.removeDefaultAcl(req.getSrc());
+      return null;
+    }, vo -> VOID_REMOVEDEFAULTACL_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public RemoveAclResponseProto removeAcl(
+      RpcController controller,
+      RemoveAclRequestProto req) {
+    asyncRouterServer(() -> {
+      server.removeAcl(req.getSrc());
+      return null;
+    }, vo -> VOID_REMOVEACL_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public SetAclResponseProto setAcl(
+      RpcController controller,
+      SetAclRequestProto req) {
+    asyncRouterServer(() -> {
+      server.setAcl(req.getSrc(), PBHelperClient.convertAclEntry(req.getAclSpecList()));
+      return null;
+    }, vo -> VOID_SETACL_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetAclStatusResponseProto getAclStatus(
+      RpcController controller,
+      GetAclStatusRequestProto req) {
+    asyncRouterServer(() -> server.getAclStatus(req.getSrc()),
+        PBHelperClient::convert);
+    return null;
+  }
+
+  @Override
+  public CreateEncryptionZoneResponseProto createEncryptionZone(
+      RpcController controller, CreateEncryptionZoneRequestProto req) {
+    asyncRouterServer(() -> {
+      server.createEncryptionZone(req.getSrc(), req.getKeyName());
+      return null;
+    }, vo -> CreateEncryptionZoneResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public GetEZForPathResponseProto getEZForPath(
+      RpcController controller, GetEZForPathRequestProto req) {
+    asyncRouterServer(() -> server.getEZForPath(req.getSrc()),
+        ret -> {
+          GetEZForPathResponseProto.Builder builder =
+              GetEZForPathResponseProto.newBuilder();
+          if (ret != null) {
+            builder.setZone(PBHelperClient.convert(ret));
+          }
+          return builder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public ListEncryptionZonesResponseProto listEncryptionZones(
+      RpcController controller, ListEncryptionZonesRequestProto req) {
+    asyncRouterServer(() -> server.listEncryptionZones(req.getId()),
+        entries -> {
+          ListEncryptionZonesResponseProto.Builder builder =
+              ListEncryptionZonesResponseProto.newBuilder();
+          builder.setHasMore(entries.hasMore());
+          for (int i=0; i<entries.size(); i++) {
+            builder.addZones(PBHelperClient.convert(entries.get(i)));
+          }
+          return builder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public ReencryptEncryptionZoneResponseProto reencryptEncryptionZone(
+      RpcController controller, ReencryptEncryptionZoneRequestProto req) {
+    asyncRouterServer(() -> {
+      server.reencryptEncryptionZone(req.getZone(),
+          PBHelperClient.convert(req.getAction()));
+      return null;
+    }, vo -> ReencryptEncryptionZoneResponseProto.newBuilder().build());
+    return null;
+  }
+
+  public ListReencryptionStatusResponseProto listReencryptionStatus(
+      RpcController controller, ListReencryptionStatusRequestProto req) {
+    asyncRouterServer(() -> server.listReencryptionStatus(req.getId()),
+        entries -> {
+          ListReencryptionStatusResponseProto.Builder builder =
+              ListReencryptionStatusResponseProto.newBuilder();
+          builder.setHasMore(entries.hasMore());
+          for (int i=0; i<entries.size(); i++) {
+            builder.addStatuses(PBHelperClient.convert(entries.get(i)));
+          }
+          return builder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public SetErasureCodingPolicyResponseProto setErasureCodingPolicy(
+      RpcController controller, SetErasureCodingPolicyRequestProto req) {
+    asyncRouterServer(() -> {
+      String ecPolicyName = req.hasEcPolicyName() ?
+          req.getEcPolicyName() : null;
+      server.setErasureCodingPolicy(req.getSrc(), ecPolicyName);
+      return null;
+    }, vo -> SetErasureCodingPolicyResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public UnsetErasureCodingPolicyResponseProto unsetErasureCodingPolicy(
+      RpcController controller, UnsetErasureCodingPolicyRequestProto req) {
+    asyncRouterServer(() -> {
+      server.unsetErasureCodingPolicy(req.getSrc());
+      return null;
+    }, vo -> UnsetErasureCodingPolicyResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public GetECTopologyResultForPoliciesResponseProto getECTopologyResultForPolicies(
+      RpcController controller, GetECTopologyResultForPoliciesRequestProto req) {
+    asyncRouterServer(() -> {
+      ProtocolStringList policies = req.getPoliciesList();
+      return server.getECTopologyResultForPolicies(
+          policies.toArray(policies.toArray(new String[policies.size()])));
+    }, result -> {
+        GetECTopologyResultForPoliciesResponseProto.Builder builder =
+            GetECTopologyResultForPoliciesResponseProto.newBuilder();
+        builder
+            .setResponse(PBHelperClient.convertECTopologyVerifierResult(result));
+        return builder.build();
+      });
+    return null;
+  }
+
+  @Override
+  public SetXAttrResponseProto setXAttr(
+      RpcController controller,
+      SetXAttrRequestProto req) {
+    asyncRouterServer(() -> {
+      server.setXAttr(req.getSrc(), PBHelperClient.convertXAttr(req.getXAttr()),
+          PBHelperClient.convert(req.getFlag()));
+      return null;
+    }, vo -> VOID_SETXATTR_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetXAttrsResponseProto getXAttrs(
+      RpcController controller,
+      GetXAttrsRequestProto req) {
+    asyncRouterServer(() -> server.getXAttrs(req.getSrc(),
+        PBHelperClient.convertXAttrs(req.getXAttrsList())),
+        PBHelperClient::convertXAttrsResponse);
+    return null;
+  }
+
+  @Override
+  public ListXAttrsResponseProto listXAttrs(
+      RpcController controller,
+      ListXAttrsRequestProto req) {
+    asyncRouterServer(() -> server.listXAttrs(req.getSrc()),
+        PBHelperClient::convertListXAttrsResponse);
+    return null;
+  }
+
+  @Override
+  public RemoveXAttrResponseProto removeXAttr(
+      RpcController controller,
+      RemoveXAttrRequestProto req) {
+    asyncRouterServer(() -> {
+      server.removeXAttr(req.getSrc(), PBHelperClient.convertXAttr(req.getXAttr()));
+      return null;
+    }, vo -> VOID_REMOVEXATTR_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public CheckAccessResponseProto checkAccess(
+      RpcController controller,
+      CheckAccessRequestProto req) {
+    asyncRouterServer(() -> {
+      server.checkAccess(req.getPath(), PBHelperClient.convert(req.getMode()));
+      return null;
+    }, vo -> VOID_CHECKACCESS_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public SetStoragePolicyResponseProto setStoragePolicy(
+      RpcController controller, SetStoragePolicyRequestProto request) {
+    asyncRouterServer(() -> {
+      server.setStoragePolicy(request.getSrc(), request.getPolicyName());
+      return null;
+    }, vo -> VOID_SET_STORAGE_POLICY_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public UnsetStoragePolicyResponseProto unsetStoragePolicy(
+      RpcController controller, UnsetStoragePolicyRequestProto request) {
+    asyncRouterServer(() -> {
+      server.unsetStoragePolicy(request.getSrc());
+      return null;
+    }, vo -> VOID_UNSET_STORAGE_POLICY_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetStoragePolicyResponseProto getStoragePolicy(
+      RpcController controller, GetStoragePolicyRequestProto request) {
+    asyncRouterServer(() -> server.getStoragePolicy(request.getPath()),
+        result -> {
+          HdfsProtos.BlockStoragePolicyProto policy = PBHelperClient.convert(result);
+          return GetStoragePolicyResponseProto.newBuilder()
+              .setStoragePolicy(policy).build();
+        });
+    return null;
+  }
+
+  @Override
+  public GetStoragePoliciesResponseProto getStoragePolicies(
+      RpcController controller, GetStoragePoliciesRequestProto request) {
+    asyncRouterServer(server::getStoragePolicies,
+        policies -> {
+          GetStoragePoliciesResponseProto.Builder builder =
+              GetStoragePoliciesResponseProto.newBuilder();
+          if (policies == null) {
+            return builder.build();
+          }
+          for (BlockStoragePolicy policy : policies) {
+            builder.addPolicies(PBHelperClient.convert(policy));
+          }
+          return builder.build();
+        });
+    return null;
+  }
+
+  public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid(
+      RpcController controller,
+      GetCurrentEditLogTxidRequestProto req) {
+    asyncRouterServer(server::getCurrentEditLogTxid,
+        result -> GetCurrentEditLogTxidResponseProto.newBuilder()
+            .setTxid(result).build());
+    return null;
+  }
+
+  @Override
+  public GetEditsFromTxidResponseProto getEditsFromTxid(
+      RpcController controller,
+      GetEditsFromTxidRequestProto req) {
+    asyncRouterServer(() -> server.getEditsFromTxid(req.getTxid()),
+        PBHelperClient::convertEditsResponse);
+    return null;
+  }
+
+  @Override
+  public GetErasureCodingPoliciesResponseProto getErasureCodingPolicies(
+      RpcController controller,
+      GetErasureCodingPoliciesRequestProto request) {
+    asyncRouterServer(server::getErasureCodingPolicies,
+        ecpInfos -> {
+          GetErasureCodingPoliciesResponseProto.Builder resBuilder =
+              GetErasureCodingPoliciesResponseProto
+                  .newBuilder();
+          for (ErasureCodingPolicyInfo info : ecpInfos) {
+            resBuilder.addEcPolicies(
+                PBHelperClient.convertErasureCodingPolicy(info));
+          }
+          return resBuilder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public GetErasureCodingCodecsResponseProto getErasureCodingCodecs(
+      RpcController controller, GetErasureCodingCodecsRequestProto request) {
+    asyncRouterServer(server::getErasureCodingCodecs,
+        codecs -> {
+          GetErasureCodingCodecsResponseProto.Builder resBuilder =
+              GetErasureCodingCodecsResponseProto.newBuilder();
+          for (Map.Entry<String, String> codec : codecs.entrySet()) {
+            resBuilder.addCodec(
+                PBHelperClient.convertErasureCodingCodec(
+                    codec.getKey(), codec.getValue()));
+          }
+          return resBuilder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public AddErasureCodingPoliciesResponseProto addErasureCodingPolicies(
+      RpcController controller, AddErasureCodingPoliciesRequestProto request) {
+    asyncRouterServer(() -> {
+      ErasureCodingPolicy[] policies = request.getEcPoliciesList().stream()
+          .map(PBHelperClient::convertErasureCodingPolicy)
+          .toArray(ErasureCodingPolicy[]::new);
+      return server
+          .addErasureCodingPolicies(policies);
+    }, result -> {
+        List<HdfsProtos.AddErasureCodingPolicyResponseProto> responseProtos =
+            Arrays.stream(result)
+                .map(PBHelperClient::convertAddErasureCodingPolicyResponse)
+                .collect(Collectors.toList());
+        AddErasureCodingPoliciesResponseProto response =
+            AddErasureCodingPoliciesResponseProto.newBuilder()
+                .addAllResponses(responseProtos).build();
+        return response;
+      });
+    return null;
+  }
+
+  @Override
+  public RemoveErasureCodingPolicyResponseProto removeErasureCodingPolicy(
+      RpcController controller, RemoveErasureCodingPolicyRequestProto request) {
+    asyncRouterServer(() -> {
+      server.removeErasureCodingPolicy(request.getEcPolicyName());
+      return null;
+    }, vo -> RemoveErasureCodingPolicyResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public EnableErasureCodingPolicyResponseProto enableErasureCodingPolicy(
+      RpcController controller, EnableErasureCodingPolicyRequestProto request) {
+    asyncRouterServer(() -> {
+      server.enableErasureCodingPolicy(request.getEcPolicyName());
+      return null;
+    }, vo -> EnableErasureCodingPolicyResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public DisableErasureCodingPolicyResponseProto disableErasureCodingPolicy(
+      RpcController controller, DisableErasureCodingPolicyRequestProto request) {
+    asyncRouterServer(() -> {
+      server.disableErasureCodingPolicy(request.getEcPolicyName());
+      return null;
+    }, vo -> DisableErasureCodingPolicyResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public GetErasureCodingPolicyResponseProto getErasureCodingPolicy(
+      RpcController controller,
+      GetErasureCodingPolicyRequestProto request) {
+    asyncRouterServer(() -> server.getErasureCodingPolicy(request.getSrc()),
+        ecPolicy -> {
+          GetErasureCodingPolicyResponseProto.Builder builder =
+              GetErasureCodingPolicyResponseProto.newBuilder();
+          if (ecPolicy != null) {
+            builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(ecPolicy));
+          }
+          return builder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public GetQuotaUsageResponseProto getQuotaUsage(
+      RpcController controller, GetQuotaUsageRequestProto req) {
+    asyncRouterServer(() -> server.getQuotaUsage(req.getPath()),
+        result -> GetQuotaUsageResponseProto.newBuilder()
+            .setUsage(PBHelperClient.convert(result)).build());
+    return null;
+  }
+
+  @Override
+  public ListOpenFilesResponseProto listOpenFiles(
+      RpcController controller,
+      ListOpenFilesRequestProto req) {
+    asyncRouterServer(() -> {
+      EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes =
+          PBHelperClient.convertOpenFileTypes(req.getTypesList());
+      return server.listOpenFiles(req.getId(),
+          openFilesTypes, req.getPath());
+    }, entries -> {
+        ListOpenFilesResponseProto.Builder builder =
+            ListOpenFilesResponseProto.newBuilder();
+        builder.setHasMore(entries.hasMore());
+        for (int i = 0; i < entries.size(); i++) {
+          builder.addEntries(PBHelperClient.convert(entries.get(i)));
+        }
+        builder.addAllTypes(req.getTypesList());
+        return builder.build();
+      });
+    return null;
+  }
+
+  @Override
+  public MsyncResponseProto msync(RpcController controller, MsyncRequestProto req) {
+    asyncRouterServer(() -> {
+      server.msync();
+      return null;
+    }, vo -> MsyncResponseProto.newBuilder().build());
+    return null;
+  }
+
+  @Override
+  public SatisfyStoragePolicyResponseProto satisfyStoragePolicy(
+      RpcController controller, SatisfyStoragePolicyRequestProto request) {
+    asyncRouterServer(() -> {
+      server.satisfyStoragePolicy(request.getSrc());
+      return null;
+    }, vo -> VOID_SATISFYSTORAGEPOLICY_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public HAServiceStateResponseProto getHAServiceState(
+      RpcController controller,
+      HAServiceStateRequestProto request) {
+    asyncRouterServer(server::getHAServiceState,
+        state -> {
+          HAServiceProtocolProtos.HAServiceStateProto retState;
+          switch (state) {
+            case ACTIVE:
+              retState = HAServiceProtocolProtos.HAServiceStateProto.ACTIVE;
+              break;
+            case STANDBY:
+              retState = HAServiceProtocolProtos.HAServiceStateProto.STANDBY;
+              break;
+            case OBSERVER:
+              retState = HAServiceProtocolProtos.HAServiceStateProto.OBSERVER;
+              break;
+            case INITIALIZING:
+            default:
+              retState = HAServiceProtocolProtos.HAServiceStateProto.INITIALIZING;
+              break;
+          }
+          HAServiceStateResponseProto.Builder builder =
+              HAServiceStateResponseProto.newBuilder();
+          builder.setState(retState);
+          return builder.build();
+        });
+    return null;
+  }
+
+  @Override
+  public GetSlowDatanodeReportResponseProto getSlowDatanodeReport(
+      RpcController controller,
+      GetSlowDatanodeReportRequestProto request) {
+    asyncRouterServer(server::getSlowDatanodeReport,
+        res -> {
+          List<? extends HdfsProtos.DatanodeInfoProto> result =
+              PBHelperClient.convert(res);
+          return GetSlowDatanodeReportResponseProto.newBuilder()
+              .addAllDatanodeInfoProto(result)
+              .build();
+        });
+    return null;
+  }
+
+  @Override
+  public GetEnclosingRootResponseProto getEnclosingRoot(
+      RpcController controller, GetEnclosingRootRequestProto req) {
+    asyncRouterServer(() -> server.getEnclosingRoot(req.getFilename()),
+        enclosingRootPath -> ClientNamenodeProtocolProtos
+            .GetEnclosingRootResponseProto.newBuilder()
+            .setEnclosingRootPath(enclosingRootPath.toUri().toString())
+            .build());
+    return null;
+  }
+}

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolServerSideTranslatorPB.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.thirdparty.protobuf.RpcController;
+import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserRequestProto;
+import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
+
+import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer;
+
+public class RouterGetUserMappingsProtocolServerSideTranslatorPB
+    extends GetUserMappingsProtocolServerSideTranslatorPB {
+  private final RouterRpcServer server;
+  private final boolean isAsyncRpc;
+
+  public RouterGetUserMappingsProtocolServerSideTranslatorPB(GetUserMappingsProtocol impl) {
+    super(impl);
+    this.server = (RouterRpcServer) impl;
+    this.isAsyncRpc = server.isAsync();
+  }
+
+  @Override
+  public GetGroupsForUserResponseProto getGroupsForUser(
+      RpcController controller,
+      GetGroupsForUserRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.getGroupsForUser(controller, request);
+    }
+    asyncRouterServer(() -> server.getGroupsForUser(request.getUser()), groups -> {
+      GetGroupsForUserResponseProto.Builder builder =
+          GetGroupsForUserResponseProto
+          .newBuilder();
+      for (String g : groups) {
+        builder.addGroups(g);
+      }
+      return builder.build();
+    });
+    return null;
+  }
+}

+ 276 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolServerSideTranslatorPB.java

@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.thirdparty.protobuf.RpcController;
+import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer;
+
+public class RouterNamenodeProtocolServerSideTranslatorPB
+    extends NamenodeProtocolServerSideTranslatorPB {
+
+  private final RouterRpcServer server;
+  private final boolean isAsyncRpc;
+
+  public RouterNamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) {
+    super(impl);
+    this.server = (RouterRpcServer) impl;
+    this.isAsyncRpc = server.isAsync();
+  }
+
+  @Override
+  public GetBlocksResponseProto getBlocks(RpcController unused,
+      GetBlocksRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.getBlocks(unused, request);
+    }
+    asyncRouterServer(() -> {
+      DatanodeInfo dnInfo = new DatanodeInfo.DatanodeInfoBuilder()
+          .setNodeID(PBHelperClient.convert(request.getDatanode()))
+          .build();
+      return server.getBlocks(dnInfo, request.getSize(),
+          request.getMinBlockSize(), request.getTimeInterval(),
+          request.hasStorageType() ?
+              PBHelperClient.convertStorageType(request.getStorageType()): null);
+    }, blocks ->
+        GetBlocksResponseProto.newBuilder()
+            .setBlocks(PBHelper.convert(blocks)).build());
+    return null;
+  }
+
+  @Override
+  public GetBlockKeysResponseProto getBlockKeys(RpcController unused,
+      GetBlockKeysRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.getBlockKeys(unused, request);
+    }
+    asyncRouterServer(server::getBlockKeys, keys -> {
+      GetBlockKeysResponseProto.Builder builder =
+          GetBlockKeysResponseProto.newBuilder();
+      if (keys != null) {
+        builder.setKeys(PBHelper.convert(keys));
+      }
+      return builder.build();
+    });
+    return null;
+  }
+
+  @Override
+  public GetTransactionIdResponseProto getTransactionId(RpcController unused,
+      GetTransactionIdRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.getTransactionId(unused, request);
+    }
+    asyncRouterServer(server::getTransactionID,
+        txid -> GetTransactionIdResponseProto
+            .newBuilder().setTxId(txid).build());
+    return null;
+  }
+
+  @Override
+  public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
+      RpcController unused, GetMostRecentCheckpointTxIdRequestProto request)
+      throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.getMostRecentCheckpointTxId(unused, request);
+    }
+    asyncRouterServer(server::getMostRecentCheckpointTxId,
+        txid -> GetMostRecentCheckpointTxIdResponseProto
+            .newBuilder().setTxId(txid).build());
+    return null;
+  }
+
+  @Override
+  public GetMostRecentNameNodeFileTxIdResponseProto getMostRecentNameNodeFileTxId(
+      RpcController unused, GetMostRecentNameNodeFileTxIdRequestProto request)
+      throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.getMostRecentNameNodeFileTxId(unused, request);
+    }
+    asyncRouterServer(() -> server.getMostRecentNameNodeFileTxId(
+        NNStorage.NameNodeFile.valueOf(request.getNameNodeFile())),
+        txid -> GetMostRecentNameNodeFileTxIdResponseProto
+            .newBuilder().setTxId(txid).build());
+    return null;
+  }
+
+  @Override
+  public RollEditLogResponseProto rollEditLog(RpcController unused,
+      RollEditLogRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.rollEditLog(unused, request);
+    }
+    asyncRouterServer(server::rollEditLog,
+        signature -> RollEditLogResponseProto.newBuilder()
+            .setSignature(PBHelper.convert(signature)).build());
+    return null;
+  }
+
+  @Override
+  public ErrorReportResponseProto errorReport(RpcController unused,
+      ErrorReportRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.errorReport(unused, request);
+    }
+    asyncRouterServer(() -> {
+      server.errorReport(PBHelper.convert(request.getRegistration()),
+          request.getErrorCode(), request.getMsg());
+      return null;
+    }, result -> VOID_ERROR_REPORT_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public RegisterResponseProto registerSubordinateNamenode(
+      RpcController unused, RegisterRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.registerSubordinateNamenode(unused, request);
+    }
+    asyncRouterServer(() -> server.registerSubordinateNamenode(
+        PBHelper.convert(request.getRegistration())),
+        reg -> RegisterResponseProto.newBuilder()
+            .setRegistration(PBHelper.convert(reg)).build());
+    return null;
+  }
+
+  @Override
+  public StartCheckpointResponseProto startCheckpoint(RpcController unused,
+      StartCheckpointRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.startCheckpoint(unused, request);
+    }
+    asyncRouterServer(() ->
+            server.startCheckpoint(PBHelper.convert(request.getRegistration())),
+        cmd -> StartCheckpointResponseProto.newBuilder()
+            .setCommand(PBHelper.convert(cmd)).build());
+    return null;
+  }
+
+
+  @Override
+  public EndCheckpointResponseProto endCheckpoint(RpcController unused,
+      EndCheckpointRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.endCheckpoint(unused, request);
+    }
+    asyncRouterServer(() -> {
+      server.endCheckpoint(PBHelper.convert(request.getRegistration()),
+          PBHelper.convert(request.getSignature()));
+      return null;
+    }, result -> VOID_END_CHECKPOINT_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public GetEditLogManifestResponseProto getEditLogManifest(
+      RpcController unused, GetEditLogManifestRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.getEditLogManifest(unused, request);
+    }
+    asyncRouterServer(() -> server.getEditLogManifest(request.getSinceTxId()),
+        manifest -> GetEditLogManifestResponseProto.newBuilder()
+            .setManifest(PBHelper.convert(manifest)).build());
+    return null;
+  }
+
+  @Override
+  public VersionResponseProto versionRequest(
+      RpcController controller,
+      VersionRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.versionRequest(controller, request);
+    }
+    asyncRouterServer(server::versionRequest,
+        info -> VersionResponseProto.newBuilder()
+            .setInfo(PBHelper.convert(info)).build());
+    return null;
+  }
+
+  @Override
+  public IsUpgradeFinalizedResponseProto isUpgradeFinalized(RpcController controller,
+      IsUpgradeFinalizedRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.isUpgradeFinalized(controller, request);
+    }
+    asyncRouterServer(server::isUpgradeFinalized,
+        isUpgradeFinalized -> IsUpgradeFinalizedResponseProto.newBuilder()
+            .setIsUpgradeFinalized(isUpgradeFinalized).build());
+    return null;
+  }
+
+  @Override
+  public IsRollingUpgradeResponseProto isRollingUpgrade(
+      RpcController controller, IsRollingUpgradeRequestProto request)
+      throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.isRollingUpgrade(controller, request);
+    }
+    asyncRouterServer(server::isRollingUpgrade,
+        isRollingUpgrade -> IsRollingUpgradeResponseProto.newBuilder()
+            .setIsRollingUpgrade(isRollingUpgrade).build());
+    return null;
+  }
+
+  @Override
+  public GetNextSPSPathResponseProto getNextSPSPath(
+      RpcController controller, GetNextSPSPathRequestProto request)
+      throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.getNextSPSPath(controller, request);
+    }
+    asyncRouterServer(server::getNextSPSPath,
+        nextSPSPath -> GetNextSPSPathResponseProto.newBuilder()
+            .setSpsPath(nextSPSPath).build());
+    return null;
+  }
+}

+ 83 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolServerSideTranslatorPB.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationRequestProto;
+import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationResponseProto;
+import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsRequestProto;
+import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsResponseProto;
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
+import org.apache.hadoop.thirdparty.protobuf.RpcController;
+import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer;
+
+public class RouterRefreshUserMappingsProtocolServerSideTranslatorPB
+    extends RefreshUserMappingsProtocolServerSideTranslatorPB {
+
+  private final RouterRpcServer server;
+  private final boolean isAsyncRpc;
+
+  private final static RefreshUserToGroupsMappingsResponseProto
+      VOID_REFRESH_USER_GROUPS_MAPPING_RESPONSE =
+      RefreshUserToGroupsMappingsResponseProto.newBuilder().build();
+
+  private final static RefreshSuperUserGroupsConfigurationResponseProto
+      VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_RESPONSE =
+      RefreshSuperUserGroupsConfigurationResponseProto.newBuilder()
+          .build();
+
+  public RouterRefreshUserMappingsProtocolServerSideTranslatorPB(
+      RefreshUserMappingsProtocol impl) {
+    super(impl);
+    this.server = (RouterRpcServer) impl;
+    this.isAsyncRpc = server.isAsync();
+  }
+
+  @Override
+  public RefreshUserToGroupsMappingsResponseProto refreshUserToGroupsMappings(
+      RpcController controller, RefreshUserToGroupsMappingsRequestProto request)
+      throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.refreshUserToGroupsMappings(controller, request);
+    }
+    asyncRouterServer(() -> {
+      server.refreshUserToGroupsMappings();
+      return null;
+    }, result ->
+        VOID_REFRESH_USER_GROUPS_MAPPING_RESPONSE);
+    return null;
+  }
+
+  @Override
+  public RefreshSuperUserGroupsConfigurationResponseProto refreshSuperUserGroupsConfiguration(
+      RpcController controller,
+      RefreshSuperUserGroupsConfigurationRequestProto request) throws ServiceException {
+    if (!isAsyncRpc) {
+      return super.refreshSuperUserGroupsConfiguration(controller, request);
+    }
+    asyncRouterServer(() -> {
+      server.refreshSuperUserGroupsConfiguration();
+      return null;
+    }, result ->
+        VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_RESPONSE);
+    return null;
+  }
+}

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.federation.metrics;
 
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.IOException;
@@ -479,6 +480,9 @@ public class NamenodeBeanMetrics
           this.router.getRpcServer().getClientProtocolModule();
       DatanodeStorageReport[] datanodeStorageReports =
           clientProtocol.getDatanodeStorageReport(type, false, dnReportTimeOut);
+      if (router.getRpcServer().isAsync()) {
+        datanodeStorageReports = syncReturn(DatanodeStorageReport[].class);
+      }
       for (DatanodeStorageReport datanodeStorageReport : datanodeStorageReports) {
         DatanodeInfo node = datanodeStorageReport.getDatanodeInfo();
         StorageReport[] storageReports = datanodeStorageReport.getStorageReports();
@@ -512,7 +516,7 @@ public class NamenodeBeanMetrics
       LOG.error("Cannot get {} nodes, Router in safe mode", type);
     } catch (SubClusterTimeoutException e) {
       LOG.error("Cannot get {} nodes, subclusters timed out responding", type);
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Cannot get " + type + " nodes", e);
     }
     return JSON.toString(info);

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.federation.metrics;
 
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
 import static org.apache.hadoop.util.Time.now;
 
@@ -559,7 +560,12 @@ public class RBFMetrics implements RouterMBean, FederationMBean {
       DatanodeInfo[] live = null;
       if (this.enableGetDNUsage) {
         RouterRpcServer rpcServer = this.router.getRpcServer();
-        live = rpcServer.getDatanodeReport(DatanodeReportType.LIVE, false, timeOut);
+        if (rpcServer.isAsync()) {
+          rpcServer.getDatanodeReportAsync(DatanodeReportType.LIVE, false, timeOut);
+          live = syncReturn(DatanodeInfo[].class);
+        } else {
+          live = rpcServer.getDatanodeReport(DatanodeReportType.LIVE, false, timeOut);
+        }
       } else {
         LOG.debug("Getting node usage is disabled.");
       }
@@ -578,7 +584,7 @@ public class RBFMetrics implements RouterMBean, FederationMBean {
         StandardDeviation deviation = new StandardDeviation();
         dev = deviation.evaluate(usages);
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Cannot get the live nodes: {}", e.getMessage());
     }
 

+ 34 - 10
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -43,6 +43,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncU
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
 
 import java.io.FileNotFoundException;
@@ -75,6 +76,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
+import org.apache.hadoop.hdfs.protocolPB.RouterClientNamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.RouterGetUserMappingsProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.RouterNamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.RouterRefreshUserMappingsProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota;
 import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol;
 import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol;
@@ -328,26 +333,39 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
 
     RPC.setProtocolEngine(this.conf, ClientNamenodeProtocolPB.class,
         ProtobufRpcEngine2.class);
-
     ClientNamenodeProtocolServerSideTranslatorPB
-        clientProtocolServerTranslator =
-            new ClientNamenodeProtocolServerSideTranslatorPB(this);
+        clientProtocolServerTranslator = null;
+    NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = null;
+    RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = null;
+    GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = null;
+    if (isAsync()) {
+      clientProtocolServerTranslator =
+          new RouterClientNamenodeProtocolServerSideTranslatorPB(this);
+      namenodeProtocolXlator =
+          new RouterNamenodeProtocolServerSideTranslatorPB(this);
+      refreshUserMappingXlator =
+          new RouterRefreshUserMappingsProtocolServerSideTranslatorPB(this);
+      getUserMappingXlator =
+          new RouterGetUserMappingsProtocolServerSideTranslatorPB(this);
+    } else {
+      clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this);
+      namenodeProtocolXlator =
+          new NamenodeProtocolServerSideTranslatorPB(this);
+      refreshUserMappingXlator =
+          new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
+      getUserMappingXlator =
+          new GetUserMappingsProtocolServerSideTranslatorPB(this);
+    }
     BlockingService clientNNPbService = ClientNamenodeProtocol
         .newReflectiveBlockingService(clientProtocolServerTranslator);
 
-    NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
-        new NamenodeProtocolServerSideTranslatorPB(this);
     BlockingService nnPbService = NamenodeProtocolService
         .newReflectiveBlockingService(namenodeProtocolXlator);
 
-    RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =
-        new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
     BlockingService refreshUserMappingService =
         RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService.
         newReflectiveBlockingService(refreshUserMappingXlator);
 
-    GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
-        new GetUserMappingsProtocolServerSideTranslatorPB(this);
     BlockingService getUserMappingService =
         GetUserMappingsProtocolProtos.GetUserMappingsProtocolService.
         newReflectiveBlockingService(getUserMappingXlator);
@@ -428,7 +446,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     // Create the client
     if (this.enableAsync) {
       this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
-          this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+          this.namenodeResolver, this.rpcMonitor,
+          routerStateIdContext, asyncRouterHandler);
       this.clientProto = new RouterAsyncClientProtocol(conf, this);
       this.nnProto = new RouterAsyncNamenodeProtocol(this);
       this.routerProto = new RouterAsyncUserProtocol(this);
@@ -1336,8 +1355,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
 
     try {
       DatanodeInfo[] dns = clientProto.getDatanodeReport(type);
+      if (router.getRpcServer().isAsync()) {
+        dns = syncReturn(DatanodeInfo[].class);
+      }
       LOG.debug("Refresh cached DN report with {} datanodes", dns.length);
       return dns;
+    } catch (Exception e) {
+      throw new IOException(e);
     } finally {
       // Reset ugi to remote user for remaining operations.
       RouterRpcServer.resetCurrentUser();

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java

@@ -794,6 +794,12 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol {
     return rpcServer.getDatanodeReportAsync(type, true, 0);
   }
 
+  @Override
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+    return rpcServer.getSlowDatanodeReportAsync(true, 0);
+  }
+
   @Override
   public DatanodeStorageReport[] getDatanodeStorageReport(
       HdfsConstants.DatanodeReportType type) throws IOException {

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java

@@ -108,16 +108,17 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
    * @param resolver A NN resolver to determine the currently active NN in HA.
    * @param monitor Optional performance monitor.
    * @param routerStateIdContext the router state context object to hold the state ids for all
+   * @param asyncRouterHandler async router handler
    * namespaces.
    */
-  public RouterAsyncRpcClient(
-      Configuration conf, Router router, ActiveNamenodeResolver resolver,
-      RouterRpcMonitor monitor, RouterStateIdContext routerStateIdContext) {
+  public RouterAsyncRpcClient(Configuration conf,
+      Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor,
+      RouterStateIdContext routerStateIdContext, Executor asyncRouterHandler) {
     super(conf, router, resolver, monitor, routerStateIdContext);
     this.router = router;
     this.namenodeResolver = resolver;
     this.rpcMonitor = monitor;
-    this.asyncRouterHandler = router.getRpcServer().getAsyncRouterHandler();
+    this.asyncRouterHandler = asyncRouterHandler;
   }
 
   /**

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java

@@ -154,6 +154,11 @@ public final class AsyncUtil {
     CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture);
   }
 
+  public static CompletableFuture<Object> getAsyncUtilCompletableFuture() {
+    assert CUR_COMPLETABLE_FUTURE.get() != null;
+    return CUR_COMPLETABLE_FUTURE.get();
+  }
+
   /**
    * Completes the current asynchronous operation with an exception.
    * This method sets the result of the current thread's {@link CompletableFuture}

+ 18 - 14
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

@@ -164,12 +164,12 @@ public class TestRouterRpc {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestRouterRpc.class);
 
-  private static final int NUM_SUBCLUSTERS = 2;
+  protected static final int NUM_SUBCLUSTERS = 2;
   // We need at least 6 DNs to test Erasure Coding with RS-6-3-64k
-  private static final int NUM_DNS = 6;
+  protected static final int NUM_DNS = 6;
 
 
-  private static final Comparator<ErasureCodingPolicyInfo> EC_POLICY_CMP =
+  protected static final Comparator<ErasureCodingPolicyInfo> EC_POLICY_CMP =
       new Comparator<ErasureCodingPolicyInfo>() {
         public int compare(
             ErasureCodingPolicyInfo ec0,
@@ -215,6 +215,18 @@ public class TestRouterRpc {
 
   @BeforeClass
   public static void globalSetUp() throws Exception {
+    // Start routers with only an RPC service
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .build();
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    setUp(routerConf);
+  }
+
+  public static void setUp(Configuration routerConf) throws Exception {
     Configuration namenodeConf = new Configuration();
     namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
         true);
@@ -243,14 +255,6 @@ public class TestRouterRpc {
     // Start NNs and DNs and wait until ready
     cluster.startCluster();
 
-    // Start routers with only an RPC service
-    Configuration routerConf = new RouterConfigBuilder()
-        .metrics()
-        .rpc()
-        .build();
-    // We decrease the DN cache times to make the test faster
-    routerConf.setTimeDuration(
-        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
     cluster.addRouterOverrides(routerConf);
     cluster.startRouters();
 
@@ -1261,8 +1265,8 @@ public class TestRouterRpc {
     createFile(routerFS, targetFile, existingFileSize);
     // Concat in same namespaces, succeeds
     testConcat(srcEmptyFile, targetFile, true, true,
-        "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): concat: source file "
-            + srcEmptyFile + " is invalid or empty or underConstruction");
+        "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): "
+            + "concat: source file " + srcEmptyFile + " is invalid or empty or underConstruction");
   }
 
   @Test
@@ -2024,7 +2028,7 @@ public class TestRouterRpc {
   }
 
   @Test
-  public void testgetGroupsForUser() throws IOException {
+  public void testgetGroupsForUser() throws Exception {
     String[] group = new String[] {"bar", "group2"};
     UserGroupInformation.createUserForTesting("user",
         new String[] {"bar", "group2"});

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java

@@ -112,7 +112,8 @@ public class RouterAsyncProtocolTestBase {
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext());
+        routerRpcServer.getRouterStateIdContext(),
+        routerRpcServer.getAsyncRouterHandler());
     routerAsyncRpcServer = Mockito.spy(routerRpcServer);
     Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
     Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true);

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java

@@ -126,7 +126,8 @@ public class TestRouterAsyncErasureCoding {
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext());
+        routerRpcServer.getRouterStateIdContext(),
+        routerRpcServer.getAsyncRouterHandler());
     RouterRpcServer spy = Mockito.spy(routerRpcServer);
     Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
     asyncErasureCoding = new AsyncErasureCoding(spy);

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java

@@ -120,7 +120,8 @@ public class TestRouterAsyncQuota {
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext());
+        routerRpcServer.getRouterStateIdContext(),
+        routerRpcServer.getAsyncRouterHandler());
     RouterRpcServer spy = Mockito.spy(routerRpcServer);
     Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
     asyncQuota = new AsyncQuota(router.getRouter(), spy);

+ 76 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Testing the asynchronous RPC functionality of the router.
+ */
+public class TestRouterAsyncRpc extends TestRouterRpc {
+  private static MiniRouterDFSCluster cluster;
+  private MiniRouterDFSCluster.RouterContext rndRouter;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    // Start routers with only an RPC service
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .build();
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    // use async router.
+    routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true);
+    setUp(routerConf);
+  }
+
+  @Before
+  public void testSetup() throws Exception {
+    super.testSetup();
+    cluster = super.getCluster();
+    // Random router for this test
+    rndRouter = cluster.getRandomRouter();
+  }
+
+  @Test
+  @Override
+  public void testgetGroupsForUser() throws Exception {
+    String[] group = new String[] {"bar", "group2"};
+    UserGroupInformation.createUserForTesting("user",
+        new String[] {"bar", "group2"});
+    rndRouter.getRouter().getRpcServer().getGroupsForUser("user");
+    String[] result = syncReturn(String[].class);
+    assertArrayEquals(group, result);
+  }
+}

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java

@@ -152,7 +152,8 @@ public class TestRouterAsyncRpcClient {
     asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext());
+        routerRpcServer.getRouterStateIdContext(),
+        routerRpcServer.getAsyncRouterHandler());
 
     // Create a test file
     FSDataOutputStream fsDataOutputStream = routerFs.create(

+ 66 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpcMultiDestination;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Testing the asynchronous RPC functionality of the router with multiple mounts.
+ */
+public class TestRouterAsyncRpcMultiDestination extends TestRouterRpcMultiDestination {
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    // Start routers with only an RPC service
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .build();
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    // use async router.
+    routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true);
+    setUp(routerConf);
+  }
+
+  @Test
+  @Override
+  public void testgetGroupsForUser() throws Exception {
+    MiniRouterDFSCluster.RouterContext rndRouter = super.getRouterContext();
+    String[] group = new String[] {"bar", "group2"};
+    UserGroupInformation.createUserForTesting("user",
+        new String[] {"bar", "group2"});
+    rndRouter.getRouter().getRpcServer().getGroupsForUser("user");
+    String[] result = syncReturn(String[].class);
+    assertArrayEquals(group, result);
+  }
+}

+ 78 - 85
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -341,117 +341,109 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       VOID_UNSET_STORAGE_POLICY_RESPONSE =
       UnsetStoragePolicyResponseProto.newBuilder().build();
 
-  private static final CreateResponseProto VOID_CREATE_RESPONSE = 
-  CreateResponseProto.newBuilder().build();
+  protected static final CreateResponseProto VOID_CREATE_RESPONSE =
+      CreateResponseProto.newBuilder().build();
 
-  private static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE = 
-  SetPermissionResponseProto.newBuilder().build();
+  protected static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE =
+      SetPermissionResponseProto.newBuilder().build();
 
-  private static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE = 
-  SetOwnerResponseProto.newBuilder().build();
+  protected static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE =
+      SetOwnerResponseProto.newBuilder().build();
 
-  private static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE = 
-  AbandonBlockResponseProto.newBuilder().build();
+  protected static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE =
+      AbandonBlockResponseProto.newBuilder().build();
 
-  private static final ReportBadBlocksResponseProto VOID_REP_BAD_BLOCK_RESPONSE = 
-  ReportBadBlocksResponseProto.newBuilder().build();
+  protected static final ReportBadBlocksResponseProto VOID_REP_BAD_BLOCK_RESPONSE =
+      ReportBadBlocksResponseProto.newBuilder().build();
 
-  private static final ConcatResponseProto VOID_CONCAT_RESPONSE = 
-  ConcatResponseProto.newBuilder().build();
+  protected static final ConcatResponseProto VOID_CONCAT_RESPONSE =
+      ConcatResponseProto.newBuilder().build();
 
-  private static final Rename2ResponseProto VOID_RENAME2_RESPONSE = 
-  Rename2ResponseProto.newBuilder().build();
+  protected static final Rename2ResponseProto VOID_RENAME2_RESPONSE =
+      Rename2ResponseProto.newBuilder().build();
 
-  private static final GetListingResponseProto VOID_GETLISTING_RESPONSE = 
-  GetListingResponseProto.newBuilder().build();
+  protected static final GetListingResponseProto VOID_GETLISTING_RESPONSE =
+      GetListingResponseProto.newBuilder().build();
 
-  private static final GetBatchedListingResponseProto
+  protected static final GetBatchedListingResponseProto
       VOID_GETBATCHEDLISTING_RESPONSE =
       GetBatchedListingResponseProto.newBuilder()
           .setStartAfter(ByteString.copyFromUtf8(""))
           .setHasMore(false)
           .build();
 
-  private static final RenewLeaseResponseProto VOID_RENEWLEASE_RESPONSE = 
-  RenewLeaseResponseProto.newBuilder().build();
+  protected static final RenewLeaseResponseProto VOID_RENEWLEASE_RESPONSE =
+      RenewLeaseResponseProto.newBuilder().build();
 
-  private static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE =
-  RefreshNodesResponseProto.newBuilder().build();
+  protected static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE =
+      RefreshNodesResponseProto.newBuilder().build();
 
-  private static final FinalizeUpgradeResponseProto VOID_FINALIZEUPGRADE_RESPONSE = 
-  FinalizeUpgradeResponseProto.newBuilder().build();
+  protected static final FinalizeUpgradeResponseProto VOID_FINALIZEUPGRADE_RESPONSE =
+      FinalizeUpgradeResponseProto.newBuilder().build();
 
-  private static final MetaSaveResponseProto VOID_METASAVE_RESPONSE = 
-  MetaSaveResponseProto.newBuilder().build();
+  protected static final MetaSaveResponseProto VOID_METASAVE_RESPONSE =
+      MetaSaveResponseProto.newBuilder().build();
 
-  private static final GetFileInfoResponseProto VOID_GETFILEINFO_RESPONSE = 
-  GetFileInfoResponseProto.newBuilder().build();
+  protected static final GetFileInfoResponseProto VOID_GETFILEINFO_RESPONSE =
+      GetFileInfoResponseProto.newBuilder().build();
 
-  private static final GetLocatedFileInfoResponseProto
+  protected static final GetLocatedFileInfoResponseProto
       VOID_GETLOCATEDFILEINFO_RESPONSE =
           GetLocatedFileInfoResponseProto.newBuilder().build();
 
-  private static final GetFileLinkInfoResponseProto VOID_GETFILELINKINFO_RESPONSE = 
-  GetFileLinkInfoResponseProto.newBuilder().build();
+  protected static final GetFileLinkInfoResponseProto VOID_GETFILELINKINFO_RESPONSE =
+      GetFileLinkInfoResponseProto.newBuilder().build();
 
-  private static final SetQuotaResponseProto VOID_SETQUOTA_RESPONSE = 
-  SetQuotaResponseProto.newBuilder().build();
+  protected static final SetQuotaResponseProto VOID_SETQUOTA_RESPONSE =
+      SetQuotaResponseProto.newBuilder().build();
 
-  private static final FsyncResponseProto VOID_FSYNC_RESPONSE = 
-  FsyncResponseProto.newBuilder().build();
+  protected static final FsyncResponseProto VOID_FSYNC_RESPONSE =
+      FsyncResponseProto.newBuilder().build();
 
-  private static final SetTimesResponseProto VOID_SETTIMES_RESPONSE = 
-  SetTimesResponseProto.newBuilder().build();
+  protected static final SetTimesResponseProto VOID_SETTIMES_RESPONSE =
+      SetTimesResponseProto.newBuilder().build();
 
-  private static final CreateSymlinkResponseProto VOID_CREATESYMLINK_RESPONSE = 
-  CreateSymlinkResponseProto.newBuilder().build();
+  protected static final CreateSymlinkResponseProto VOID_CREATESYMLINK_RESPONSE =
+      CreateSymlinkResponseProto.newBuilder().build();
 
-  private static final UpdatePipelineResponseProto
-    VOID_UPDATEPIPELINE_RESPONSE = 
-  UpdatePipelineResponseProto.newBuilder().build();
+  protected static final UpdatePipelineResponseProto VOID_UPDATEPIPELINE_RESPONSE =
+      UpdatePipelineResponseProto.newBuilder().build();
 
-  private static final CancelDelegationTokenResponseProto 
-      VOID_CANCELDELEGATIONTOKEN_RESPONSE = 
-          CancelDelegationTokenResponseProto.newBuilder().build();
+  protected static final CancelDelegationTokenResponseProto VOID_CANCELDELEGATIONTOKEN_RESPONSE =
+      CancelDelegationTokenResponseProto.newBuilder().build();
 
-  private static final SetBalancerBandwidthResponseProto 
-      VOID_SETBALANCERBANDWIDTH_RESPONSE = 
-        SetBalancerBandwidthResponseProto.newBuilder().build();
+  protected static final SetBalancerBandwidthResponseProto VOID_SETBALANCERBANDWIDTH_RESPONSE =
+      SetBalancerBandwidthResponseProto.newBuilder().build();
 
-  private static final SetAclResponseProto
-    VOID_SETACL_RESPONSE = SetAclResponseProto.getDefaultInstance();
+  protected static final SetAclResponseProto VOID_SETACL_RESPONSE =
+      SetAclResponseProto.getDefaultInstance();
 
-  private static final ModifyAclEntriesResponseProto
-    VOID_MODIFYACLENTRIES_RESPONSE = ModifyAclEntriesResponseProto
-      .getDefaultInstance();
+  protected static final ModifyAclEntriesResponseProto VOID_MODIFYACLENTRIES_RESPONSE =
+      ModifyAclEntriesResponseProto.getDefaultInstance();
 
-  private static final RemoveAclEntriesResponseProto
-    VOID_REMOVEACLENTRIES_RESPONSE = RemoveAclEntriesResponseProto
-      .getDefaultInstance();
+  protected static final RemoveAclEntriesResponseProto VOID_REMOVEACLENTRIES_RESPONSE =
+      RemoveAclEntriesResponseProto.getDefaultInstance();
 
-  private static final RemoveDefaultAclResponseProto
-    VOID_REMOVEDEFAULTACL_RESPONSE = RemoveDefaultAclResponseProto
-      .getDefaultInstance();
+  protected static final RemoveDefaultAclResponseProto VOID_REMOVEDEFAULTACL_RESPONSE =
+      RemoveDefaultAclResponseProto.getDefaultInstance();
 
-  private static final RemoveAclResponseProto
-    VOID_REMOVEACL_RESPONSE = RemoveAclResponseProto.getDefaultInstance();
-  
-  private static final SetXAttrResponseProto
-    VOID_SETXATTR_RESPONSE = SetXAttrResponseProto.getDefaultInstance();
-  
-  private static final RemoveXAttrResponseProto
-    VOID_REMOVEXATTR_RESPONSE = RemoveXAttrResponseProto.getDefaultInstance();
+  protected static final RemoveAclResponseProto VOID_REMOVEACL_RESPONSE =
+      RemoveAclResponseProto.getDefaultInstance();
 
-  private static final CheckAccessResponseProto
-    VOID_CHECKACCESS_RESPONSE = CheckAccessResponseProto.getDefaultInstance();
+  protected static final SetXAttrResponseProto VOID_SETXATTR_RESPONSE =
+      SetXAttrResponseProto.getDefaultInstance();
 
-  private static final SatisfyStoragePolicyResponseProto
+  protected static final RemoveXAttrResponseProto VOID_REMOVEXATTR_RESPONSE =
+      RemoveXAttrResponseProto.getDefaultInstance();
+
+  protected static final CheckAccessResponseProto VOID_CHECKACCESS_RESPONSE =
+      CheckAccessResponseProto.getDefaultInstance();
+
+  protected static final SatisfyStoragePolicyResponseProto
       VOID_SATISFYSTORAGEPOLICY_RESPONSE = SatisfyStoragePolicyResponseProto
       .getDefaultInstance();
 
-  /**
-   * Constructor
-   * 
+  /** Constructor.
    * @param server - the NN server
    * @throws IOException
    */
@@ -730,7 +722,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
 
   @Override
   public DeleteResponseProto delete(RpcController controller,
-    DeleteRequestProto req) throws ServiceException {
+      DeleteRequestProto req) throws ServiceException {
     try {
       boolean result =  server.delete(req.getSrc(), req.getRecursive());
       return DeleteResponseProto.newBuilder().setResult(result).build();
@@ -1474,7 +1466,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       CacheDirectiveInfo filter =
           PBHelperClient.convert(request.getFilter());
       BatchedEntries<CacheDirectiveEntry> entries =
-        server.listCacheDirectives(request.getPrevId(), filter);
+          server.listCacheDirectives(request.getPrevId(), filter);
       ListCacheDirectivesResponseProto.Builder builder =
           ListCacheDirectivesResponseProto.newBuilder();
       builder.setHasMore(entries.hasMore());
@@ -1525,9 +1517,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       ListCachePoolsRequestProto request) throws ServiceException {
     try {
       BatchedEntries<CachePoolEntry> entries =
-        server.listCachePools(request.getPrevPoolName());
+          server.listCachePools(request.getPrevPoolName());
       ListCachePoolsResponseProto.Builder responseBuilder =
-        ListCachePoolsResponseProto.newBuilder();
+          ListCachePoolsResponseProto.newBuilder();
       responseBuilder.setHasMore(entries.hasMore());
       for (int i=0, n=entries.size(); i<n; i++) {
         responseBuilder.addEntries(PBHelperClient.convert(entries.get(i)));
@@ -1609,8 +1601,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   
   @Override
   public CreateEncryptionZoneResponseProto createEncryptionZone(
-    RpcController controller, CreateEncryptionZoneRequestProto req)
-    throws ServiceException {
+      RpcController controller, CreateEncryptionZoneRequestProto req)
+      throws ServiceException {
     try {
       server.createEncryptionZone(req.getSrc(), req.getKeyName());
       return CreateEncryptionZoneResponseProto.newBuilder().build();
@@ -1638,8 +1630,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
 
   @Override
   public ListEncryptionZonesResponseProto listEncryptionZones(
-    RpcController controller, ListEncryptionZonesRequestProto req)
-    throws ServiceException {
+      RpcController controller, ListEncryptionZonesRequestProto req)
+      throws ServiceException {
     try {
       BatchedEntries<EncryptionZone> entries = server
           .listEncryptionZones(req.getId());
@@ -1755,7 +1747,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
 
   @Override
   public ListXAttrsResponseProto listXAttrs(RpcController controller,
-    ListXAttrsRequestProto req) throws ServiceException {
+                                            ListXAttrsRequestProto req) throws ServiceException {
     try {
       return PBHelperClient.convertListXAttrsResponse(server.listXAttrs(req.getSrc()));
     } catch (IOException e) {
@@ -1776,7 +1768,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
 
   @Override
   public CheckAccessResponseProto checkAccess(RpcController controller,
-     CheckAccessRequestProto req) throws ServiceException {
+      CheckAccessRequestProto req) throws ServiceException {
     try {
       server.checkAccess(req.getPath(), PBHelperClient.convert(req.getMode()));
     } catch (IOException e) {
@@ -1869,8 +1861,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       GetErasureCodingPoliciesRequestProto request) throws ServiceException {
     try {
       ErasureCodingPolicyInfo[] ecpInfos = server.getErasureCodingPolicies();
-      GetErasureCodingPoliciesResponseProto.Builder resBuilder = GetErasureCodingPoliciesResponseProto
-          .newBuilder();
+      GetErasureCodingPoliciesResponseProto.Builder resBuilder =
+          GetErasureCodingPoliciesResponseProto.newBuilder();
       for (ErasureCodingPolicyInfo info : ecpInfos) {
         resBuilder.addEcPolicies(
             PBHelperClient.convertErasureCodingPolicy(info));
@@ -1965,7 +1957,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       GetErasureCodingPolicyRequestProto request) throws ServiceException {
     try {
       ErasureCodingPolicy ecPolicy = server.getErasureCodingPolicy(request.getSrc());
-      GetErasureCodingPolicyResponseProto.Builder builder = GetErasureCodingPolicyResponseProto.newBuilder();
+      GetErasureCodingPolicyResponseProto.Builder builder =
+          GetErasureCodingPolicyResponseProto.newBuilder();
       if (ecPolicy != null) {
         builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(ecPolicy));
       }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java

@@ -73,11 +73,11 @@ public class NamenodeProtocolServerSideTranslatorPB implements
     NamenodeProtocolPB {
   private final NamenodeProtocol impl;
 
-  private final static ErrorReportResponseProto VOID_ERROR_REPORT_RESPONSE = 
-  ErrorReportResponseProto.newBuilder().build();
+  protected final static ErrorReportResponseProto VOID_ERROR_REPORT_RESPONSE =
+      ErrorReportResponseProto.newBuilder().build();
 
-  private final static EndCheckpointResponseProto VOID_END_CHECKPOINT_RESPONSE =
-  EndCheckpointResponseProto.newBuilder().build();
+  protected final static EndCheckpointResponseProto VOID_END_CHECKPOINT_RESPONSE =
+      EndCheckpointResponseProto.newBuilder().build();
 
   public NamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) {
     this.impl = impl;