Sfoglia il codice sorgente

HDFS-17713. [ARR] Throtting asynchronous calls for each nameservice. (#7304). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
hfutatzhanghb 2 mesi fa
parent
commit
df5e205452

+ 91 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.
+    DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.
+    DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT;
+
+/**
+ * When router async rpc enabled, it is recommended to use this fairness controller.
+ */
+public class RouterAsyncRpcFairnessPolicyController extends
+    AbstractRouterRpcFairnessPolicyController {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterAsyncRpcFairnessPolicyController.class);
+
+  public static final String INIT_MSG = "Max async call permits per nameservice: %d";
+
+  public RouterAsyncRpcFairnessPolicyController(Configuration conf) {
+    init(conf);
+  }
+
+  public void init(Configuration conf) throws IllegalArgumentException {
+    super.init(conf);
+
+    int maxAsyncCallPermit = conf.getInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY,
+        DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT);
+    if (maxAsyncCallPermit <= 0) {
+      maxAsyncCallPermit = DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT;
+    }
+    LOG.info(String.format(INIT_MSG, maxAsyncCallPermit));
+
+    // Get all name services configured.
+    Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf);
+
+    for (String nsId : allConfiguredNS) {
+      LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, nsId);
+      insertNameServiceWithPermits(nsId, maxAsyncCallPermit);
+      logAssignment(nsId, maxAsyncCallPermit);
+    }
+    // Avoid NPE when router async rpc disable.
+    insertNameServiceWithPermits(CONCURRENT_NS, maxAsyncCallPermit);
+    LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, CONCURRENT_NS);
+  }
+
+  private static void logAssignment(String nsId, int count) {
+    LOG.info("Assigned {} permits to nsId {} ", count, nsId);
+  }
+
+  @Override
+  public boolean acquirePermit(String nsId) {
+    if (nsId.equals(CONCURRENT_NS)) {
+      return true;
+    }
+    return super.acquirePermit(nsId);
+  }
+
+  @Override
+  public void releasePermit(String nsId) {
+    if (nsId.equals(CONCURRENT_NS)) {
+      return;
+    }
+    super.releasePermit(nsId);
+  }
+}

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -88,6 +88,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
   public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
           FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
           FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
   public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
   public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
+  public static final String DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY =
+      FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "max.asynccall.permit";
+  public static final int DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT = 20000;
 
 
   public static final String DFS_ROUTER_METRICS_ENABLE =
   public static final String DFS_ROUTER_METRICS_ENABLE =
       FEDERATION_ROUTER_PREFIX + "metrics.enable";
       FEDERATION_ROUTER_PREFIX + "metrics.enable";

+ 9 - 9
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -1024,7 +1024,7 @@ public class RouterRpcClient {
       throws IOException {
       throws IOException {
     UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
     RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
-    acquirePermit(nsId, ugi, method, controller);
+    acquirePermit(nsId, ugi, method.getMethodName(), controller);
     try {
     try {
       boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
       boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
       List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
       List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
@@ -1199,7 +1199,7 @@ public class RouterRpcClient {
       boolean isObserverRead = isObserverReadEligible(ns, m);
       boolean isObserverRead = isObserverReadEligible(ns, m);
       List<? extends FederationNamenodeContext> namenodes =
       List<? extends FederationNamenodeContext> namenodes =
           getOrderedNamenodes(ns, isObserverRead);
           getOrderedNamenodes(ns, isObserverRead);
-      acquirePermit(ns, ugi, remoteMethod, controller);
+      acquirePermit(ns, ugi, remoteMethod.getMethodName(), controller);
       try {
       try {
         Class<?> proto = remoteMethod.getProtocol();
         Class<?> proto = remoteMethod.getProtocol();
         Object[] params = remoteMethod.getParams(loc);
         Object[] params = remoteMethod.getParams(loc);
@@ -1579,7 +1579,7 @@ public class RouterRpcClient {
       return invokeSingle(locations.iterator().next(), method);
       return invokeSingle(locations.iterator().next(), method);
     }
     }
     RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
     RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
-    acquirePermit(CONCURRENT_NS, ugi, method, controller);
+    acquirePermit(CONCURRENT_NS, ugi, method.getMethodName(), controller);
 
 
     List<T> orderedLocations = new ArrayList<>();
     List<T> orderedLocations = new ArrayList<>();
     List<Callable<Object>> callables = new ArrayList<>();
     List<Callable<Object>> callables = new ArrayList<>();
@@ -1758,7 +1758,7 @@ public class RouterRpcClient {
     final List<? extends FederationNamenodeContext> namenodes =
     final List<? extends FederationNamenodeContext> namenodes =
         getOrderedNamenodes(ns, isObserverRead);
         getOrderedNamenodes(ns, isObserverRead);
     RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
     RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
-    acquirePermit(ns, ugi, method, controller);
+    acquirePermit(ns, ugi, method.getMethodName(), controller);
     try {
     try {
       Class<?> proto = method.getProtocol();
       Class<?> proto = method.getProtocol();
       Object[] paramList = method.getParams(location);
       Object[] paramList = method.getParams(location);
@@ -1829,12 +1829,12 @@ public class RouterRpcClient {
    *
    *
    * @param nsId Identifier of the block pool.
    * @param nsId Identifier of the block pool.
    * @param ugi UserGroupIdentifier associated with the user.
    * @param ugi UserGroupIdentifier associated with the user.
-   * @param m Remote method that needs to be invoked.
+   * @param methodName The name of remote method that needs to be invoked.
    * @param controller fairness policy controller to acquire permit from
    * @param controller fairness policy controller to acquire permit from
    * @throws IOException If permit could not be acquired for the nsId.
    * @throws IOException If permit could not be acquired for the nsId.
    */
    */
   protected void acquirePermit(final String nsId, final UserGroupInformation ugi,
   protected void acquirePermit(final String nsId, final UserGroupInformation ugi,
-      final RemoteMethod m, RouterRpcFairnessPolicyController controller)
+      final String methodName, RouterRpcFairnessPolicyController controller)
       throws IOException {
       throws IOException {
     if (controller != null) {
     if (controller != null) {
       if (!controller.acquirePermit(nsId)) {
       if (!controller.acquirePermit(nsId)) {
@@ -1845,7 +1845,7 @@ public class RouterRpcClient {
         }
         }
         incrRejectedPermitForNs(nsId);
         incrRejectedPermitForNs(nsId);
         LOG.debug("Permit denied for ugi: {} for method: {}",
         LOG.debug("Permit denied for ugi: {} for method: {}",
-            ugi, m.getMethodName());
+            ugi, methodName);
         String msg =
         String msg =
             "Router " + router.getRouterId() +
             "Router " + router.getRouterId() +
                 " is overloaded for NS: " + nsId;
                 " is overloaded for NS: " + nsId;
@@ -1880,7 +1880,7 @@ public class RouterRpcClient {
     return routerRpcFairnessPolicyController;
     return routerRpcFairnessPolicyController;
   }
   }
 
 
-  private void incrRejectedPermitForNs(String ns) {
+  protected void incrRejectedPermitForNs(String ns) {
     rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
     rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
   }
   }
 
 
@@ -1889,7 +1889,7 @@ public class RouterRpcClient {
         rejectedPermitsPerNs.get(ns).longValue() : 0L;
         rejectedPermitsPerNs.get(ns).longValue() : 0L;
   }
   }
 
 
-  private void incrAcceptedPermitForNs(String ns) {
+  protected void incrAcceptedPermitForNs(String ns) {
     acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
     acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
   }
   }
 
 

+ 31 - 32
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java

@@ -58,7 +58,6 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
 
 
-import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor;
@@ -178,8 +177,14 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
             namenodes.toString(), params);
             namenodes.toString(), params);
       }
       }
       threadLocalContext.transfer();
       threadLocalContext.transfer();
+      RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+      acquirePermit(nsid, ugi, method.getName(), controller);
       invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
       invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
           useObserver, protocol, method, params);
           useObserver, protocol, method, params);
+      asyncFinally(object -> {
+        releasePermit(nsid, ugi, method, controller);
+        return object;
+      });
     }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
     }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
         router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
         router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
     return null;
     return null;
@@ -227,7 +232,7 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
             connection[0] = getConnection(ugi, nsId, rpcAddress, protocol);
             connection[0] = getConnection(ugi, nsId, rpcAddress, protocol);
             NameNodeProxiesClient.ProxyAndInfo<?> client = connection[0].getClient();
             NameNodeProxiesClient.ProxyAndInfo<?> client = connection[0].getClient();
             invoke(namenode, status.isShouldUseObserver(), 0, method,
             invoke(namenode, status.isShouldUseObserver(), 0, method,
-                  client.getProxy(), params);
+                client.getProxy(), params);
             asyncApply(res -> {
             asyncApply(res -> {
               status.setComplete(true);
               status.setComplete(true);
               postProcessResult(method, status, namenode, nsId, client);
               postProcessResult(method, status, namenode, nsId, client);
@@ -363,7 +368,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
       Class<T> expectedResultClass, Object expectedResultValue)
       Class<T> expectedResultClass, Object expectedResultValue)
       throws IOException {
       throws IOException {
 
 
-    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
     final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     final Method m = remoteMethod.getMethod();
     final Method m = remoteMethod.getMethod();
     List<IOException> thrownExceptions = new ArrayList<>();
     List<IOException> thrownExceptions = new ArrayList<>();
@@ -378,7 +382,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
           boolean isObserverRead = isObserverReadEligible(ns, m);
           boolean isObserverRead = isObserverReadEligible(ns, m);
           List<? extends FederationNamenodeContext> namenodes =
           List<? extends FederationNamenodeContext> namenodes =
               getOrderedNamenodes(ns, isObserverRead);
               getOrderedNamenodes(ns, isObserverRead);
-          acquirePermit(ns, ugi, remoteMethod, controller);
           asyncTry(() -> {
           asyncTry(() -> {
             Class<?> proto = remoteMethod.getProtocol();
             Class<?> proto = remoteMethod.getProtocol();
             Object[] params = remoteMethod.getParams(loc);
             Object[] params = remoteMethod.getParams(loc);
@@ -419,10 +422,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
             }
             }
             return ret;
             return ret;
           }, Exception.class);
           }, Exception.class);
-          asyncFinally(ret -> {
-            releasePermit(ns, ugi, remoteMethod, controller);
-            return ret;
-          });
         });
         });
     asyncApply(result -> {
     asyncApply(result -> {
       if (status.isComplete()) {
       if (status.isComplete()) {
@@ -498,7 +497,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
   protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemoteResults(
   protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemoteResults(
       RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
       RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
       List<T> orderedLocations, List<Callable<Object>> callables) throws IOException {
       List<T> orderedLocations, List<Callable<Object>> callables) throws IOException {
-    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     final Method m = method.getMethod();
     final Method m = method.getMethod();
     final CompletableFuture<Object>[] futures =
     final CompletableFuture<Object>[] futures =
         new CompletableFuture[callables.size()];
         new CompletableFuture[callables.size()];
@@ -523,8 +521,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
             LOG.error("Unexpected error while invoking API: {}", e.getMessage());
             LOG.error("Unexpected error while invoking API: {}", e.getMessage());
             throw warpCompletionException(new IOException(
             throw warpCompletionException(new IOException(
                 "Unexpected error while invoking API " + e.getMessage(), e));
                 "Unexpected error while invoking API " + e.getMessage(), e));
-          } finally {
-            releasePermit(CONCURRENT_NS, ugi, method, controller);
           }
           }
         }));
         }));
     return asyncReturn(List.class);
     return asyncReturn(List.class);
@@ -553,8 +549,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
     boolean isObserverRead = isObserverReadEligible(ns, m);
     boolean isObserverRead = isObserverReadEligible(ns, m);
     final List<? extends FederationNamenodeContext> namenodes =
     final List<? extends FederationNamenodeContext> namenodes =
         getOrderedNamenodes(ns, isObserverRead);
         getOrderedNamenodes(ns, isObserverRead);
-    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
-    acquirePermit(ns, ugi, method, controller);
     asyncTry(() -> {
     asyncTry(() -> {
       Class<?> proto = method.getProtocol();
       Class<?> proto = method.getProtocol();
       Object[] paramList = method.getParams(location);
       Object[] paramList = method.getParams(location);
@@ -567,10 +561,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
     asyncCatch((o, ioe) -> {
     asyncCatch((o, ioe) -> {
       throw processException(ioe, location);
       throw processException(ioe, location);
     }, IOException.class);
     }, IOException.class);
-    asyncFinally(o -> {
-      releasePermit(ns, ugi, method, controller);
-      return o;
-    });
     return asyncReturn(List.class);
     return asyncReturn(List.class);
   }
   }
 
 
@@ -589,21 +579,13 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
   public Object invokeSingle(final String nsId, RemoteMethod method)
   public Object invokeSingle(final String nsId, RemoteMethod method)
       throws IOException {
       throws IOException {
     UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
-    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
-    acquirePermit(nsId, ugi, method, controller);
-    asyncTry(() -> {
-      boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
-      List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
-      RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
-      Class<?> proto = method.getProtocol();
-      Method m = method.getMethod();
-      Object[] params = method.getParams(loc);
-      invokeMethod(ugi, nns, isObserverRead, proto, m, params);
-    });
-    asyncFinally(o -> {
-      releasePermit(nsId, ugi, method, controller);
-      return o;
-    });
+    boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
+    List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
+    RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
+    Class<?> proto = method.getProtocol();
+    Method m = method.getMethod();
+    Object[] params = method.getParams(loc);
+    invokeMethod(ugi, nns, isObserverRead, proto, m, params);
     return null;
     return null;
   }
   }
 
 
@@ -627,4 +609,21 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
     invokeSequential(locations, remoteMethod);
     invokeSequential(locations, remoteMethod);
     return asyncReturn(clazz);
     return asyncReturn(clazz);
   }
   }
+
+  /**
+   * Release permit for specific nsId after processing against downstream
+   * nsId is completed.
+   *  @param nsId Identifier of the block pool.
+   * @param ugi UserGroupIdentifier associated with the user.
+   * @param m Remote method that needs to be invoked.
+   * @param controller fairness policy controller to release permit from
+   */
+  protected void releasePermit(final String nsId, final UserGroupInformation ugi,
+      final Method m, RouterRpcFairnessPolicyController controller) {
+    if (controller != null) {
+      controller.releasePermit(nsId);
+      LOG.trace("Permit released for ugi: {} for method: {}", ugi,
+          m.getName());
+    }
+  }
 }
 }

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -152,6 +152,15 @@
     </description>
     </description>
   </property>
   </property>
 
 
+  <property>
+    <name>dfs.federation.router.async.rpc.max.asynccall.permit</name>
+    <value>20000</value>
+    <description>
+      Maximum number of asynchronous RPC requests the Router can send to
+      one downstream nameservice.
+    </description>
+  </property>
+
   <property>
   <property>
     <name>dfs.federation.router.connection.creator.queue-size</name>
     <name>dfs.federation.router.connection.creator.queue-size</name>
     <value>100</value>
     <value>100</value>

+ 160 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java

@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test functionality of {@link RouterAsyncRpcFairnessPolicyController).
+ */
+public class TestRouterAsyncRpcFairnessPolicyController {
+
+  private static String nameServices =
+      "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
+  private static int perNsPermits = 30;
+
+  @Test
+  public void testHandlerAllocationEqualAssignment() {
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
+        = getFairnessPolicyController(perNsPermits);
+    verifyHandlerAllocation(routerRpcFairnessPolicyController);
+  }
+
+  @Test
+  public void testAcquireTimeout() {
+    Configuration conf = createConf(perNsPermits);
+    conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS);
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
+        FederationUtil.newFairnessPolicyController(conf);
+
+    // Ns1 should have number of perNsPermits permits allocated.
+    for (int i = 0; i < perNsPermits; i++) {
+      assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    }
+    long acquireBeginTimeMs = Time.monotonicNow();
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs;
+
+    // There are some other operations, so acquireTimeMs >= 100ms.
+    assertTrue(acquireTimeMs >= 100);
+  }
+
+  @Test
+  public void testAllocationSuccessfullyWithZeroHandlers() {
+    Configuration conf = createConf(0);
+    verifyInstantiationStatus(conf, DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT);
+  }
+
+  @Test
+  public void testAllocationSuccessfullyWithNegativePermits() {
+    Configuration conf = createConf(-1);
+    verifyInstantiationStatus(conf, DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT);
+  }
+
+  @Test
+  public void testGetAvailableHandlerOnPerNs() {
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
+        = getFairnessPolicyController(perNsPermits);
+    assertEquals("{\"concurrent\":30,\"ns2\":30,\"ns1\":30}",
+        routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
+    routerRpcFairnessPolicyController.acquirePermit("ns1");
+    assertEquals("{\"concurrent\":30,\"ns2\":30,\"ns1\":29}",
+        routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
+  }
+
+  @Test
+  public void testGetAvailableHandlerOnPerNsForNoFairness() {
+    Configuration conf = new Configuration();
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
+        FederationUtil.newFairnessPolicyController(conf);
+    assertEquals("N/A",
+        routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
+  }
+
+  private void verifyInstantiationStatus(Configuration conf, int permits) {
+    GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer
+        .captureLogs(LoggerFactory.getLogger(
+            RouterAsyncRpcFairnessPolicyController.class));
+    try {
+      FederationUtil.newFairnessPolicyController(conf);
+    } catch (IllegalArgumentException e) {
+      // Ignore the exception as it is expected here.
+    }
+    String infoMsg = String.format(
+        RouterAsyncRpcFairnessPolicyController.INIT_MSG, permits);
+    assertTrue("Should contain info message: " + infoMsg,
+        logs.getOutput().contains(infoMsg));
+  }
+
+  private RouterRpcFairnessPolicyController getFairnessPolicyController(
+      int asyncCallPermits) {
+    return FederationUtil.newFairnessPolicyController(createConf(asyncCallPermits));
+  }
+
+  private void verifyHandlerAllocation(
+      RouterRpcFairnessPolicyController routerRpcFairnessPolicyController) {
+    for (int i = 0; i < perNsPermits; i++) {
+      assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+      assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+      // CONCURRENT_NS doesn't acquire permits.
+      assertTrue(
+          routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+    }
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+
+    routerRpcFairnessPolicyController.releasePermit("ns1");
+    routerRpcFairnessPolicyController.releasePermit("ns2");
+    routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS);
+
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+  }
+
+  private Configuration createConf(int asyncCallPermits) {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY, asyncCallPermits);
+    conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices);
+    conf.setClass(
+        RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+        RouterAsyncRpcFairnessPolicyController.class,
+        RouterRpcFairnessPolicyController.class);
+    return conf;
+  }
+}

+ 11 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.federation.router.async;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterAsyncRpcFairnessPolicyController;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpc;
 import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpc;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -30,6 +32,7 @@ import org.junit.Test;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertArrayEquals;
 
 
@@ -42,16 +45,20 @@ public class TestRouterAsyncRpc extends TestRouterRpc {
 
 
   @BeforeClass
   @BeforeClass
   public static void globalSetUp() throws Exception {
   public static void globalSetUp() throws Exception {
-    // Start routers with only an RPC service
+    // Start routers with only an RPC service.
     Configuration routerConf = new RouterConfigBuilder()
     Configuration routerConf = new RouterConfigBuilder()
         .metrics()
         .metrics()
         .rpc()
         .rpc()
         .build();
         .build();
-    // We decrease the DN cache times to make the test faster
+    // We decrease the DN cache times to make the test faster.
     routerConf.setTimeDuration(
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
-    // use async router.
+    // Use async router.
     routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
     routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
+    // Use RouterAsyncRpcFairnessPolicyController as the fairness controller.
+    routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+        RouterAsyncRpcFairnessPolicyController.class,
+        RouterRpcFairnessPolicyController.class);
     setUp(routerConf);
     setUp(routerConf);
   }
   }
 
 
@@ -59,7 +66,7 @@ public class TestRouterAsyncRpc extends TestRouterRpc {
   public void testSetup() throws Exception {
   public void testSetup() throws Exception {
     super.testSetup();
     super.testSetup();
     cluster = super.getCluster();
     cluster = super.getCluster();
-    // Random router for this test
+    // Random router for this test.
     rndRouter = cluster.getRandomRouter();
     rndRouter = cluster.getRandomRouter();
   }
   }
 
 

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.federation.router.async;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterAsyncRpcFairnessPolicyController;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpcMultiDestination;
 import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpcMultiDestination;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -29,6 +31,7 @@ import org.junit.Test;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertArrayEquals;
 
 
@@ -49,6 +52,10 @@ public class TestRouterAsyncRpcMultiDestination extends TestRouterRpcMultiDestin
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
     // use async router.
     // use async router.
     routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
     routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
+    // Use RouterAsyncRpcFairnessPolicyController as the fairness controller.
+    routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+        RouterAsyncRpcFairnessPolicyController.class,
+        RouterRpcFairnessPolicyController.class);
     setUp(routerConf);
     setUp(routerConf);
   }
   }