瀏覽代碼

HDFS-16417. StaticRouterRpcFairnessPolicyController init (#3871)

Felix Nguyen 3 年之前
父節點
當前提交
b88d5ae1c6

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java

@@ -96,7 +96,8 @@ public class StaticRouterRpcFairnessPolicyController extends
     }
     }
 
 
     // Assign remaining handlers if any to fan out calls.
     // Assign remaining handlers if any to fan out calls.
-    int leftOverHandlers = handlerCount % unassignedNS.size();
+    int leftOverHandlers = unassignedNS.isEmpty() ? handlerCount :
+        handlerCount % unassignedNS.size();
     int existingPermits = getAvailablePermits(CONCURRENT_NS);
     int existingPermits = getAvailablePermits(CONCURRENT_NS);
     if (leftOverHandlers > 0) {
     if (leftOverHandlers > 0) {
       LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
       LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);

+ 24 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java

@@ -129,6 +129,30 @@ public class TestRouterRpcFairnessPolicyController {
     verifyInstantiationError(conf, 1, 4);
     verifyInstantiationError(conf, 1, 4);
   }
   }
 
 
+  @Test
+  public void testHandlerAllocationConcurrentConfigured() {
+    Configuration conf = createConf(5);
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 1);
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns2", 1);
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "concurrent", 1);
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
+        FederationUtil.newFairnessPolicyController(conf);
+
+    // ns1, ns2 should have 1 permit each
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+
+    // concurrent should have 3 permits
+    for (int i=0; i<3; i++) {
+      assertTrue(
+          routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+    }
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+  }
+
+
   private void verifyInstantiationError(Configuration conf, int handlerCount,
   private void verifyInstantiationError(Configuration conf, int handlerCount,
       int totalDedicatedHandlers) {
       int totalDedicatedHandlers) {
     GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer
     GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer