|
@@ -17,6 +17,8 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs.server.federation.fairness;
|
|
package org.apache.hadoop.hdfs.server.federation.fairness;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
|
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.assertTrue;
|
|
@@ -27,6 +29,10 @@ import java.lang.reflect.Field;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
|
|
+import java.util.Arrays;
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -46,18 +52,66 @@ import org.apache.hadoop.ipc.StandbyException;
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.junit.runner.RunWith;
|
|
|
|
+import org.junit.runners.Parameterized;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Test the Router handlers fairness control rejects and accepts requests.
|
|
* Test the Router handlers fairness control rejects and accepts requests.
|
|
*/
|
|
*/
|
|
|
|
+@RunWith(Parameterized.class)
|
|
public class TestRouterHandlersFairness {
|
|
public class TestRouterHandlersFairness {
|
|
|
|
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(TestRouterHandlersFairness.class);
|
|
LoggerFactory.getLogger(TestRouterHandlersFairness.class);
|
|
|
|
|
|
private StateStoreDFSCluster cluster;
|
|
private StateStoreDFSCluster cluster;
|
|
|
|
+ private Map<String, Integer> expectedHandlerPerNs;
|
|
|
|
+ private Class<RouterRpcFairnessPolicyController> policyControllerClass;
|
|
|
|
+ private int handlerCount;
|
|
|
|
+ private Map<String, String> configuration;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Initialize test parameters.
|
|
|
|
+ *
|
|
|
|
+ * @param policyControllerClass RouterRpcFairnessPolicyController type.
|
|
|
|
+ * @param handlerCount The total number of handlers in the router.
|
|
|
|
+ * @param configuration Custom configuration.
|
|
|
|
+ * @param expectedHandlerPerNs The number of handlers expected for each ns.
|
|
|
|
+ */
|
|
|
|
+ public TestRouterHandlersFairness(
|
|
|
|
+ Class<RouterRpcFairnessPolicyController> policyControllerClass, int handlerCount,
|
|
|
|
+ Map<String, String> configuration, Map<String, Integer> expectedHandlerPerNs) {
|
|
|
|
+ this.expectedHandlerPerNs = expectedHandlerPerNs;
|
|
|
|
+ this.policyControllerClass = policyControllerClass;
|
|
|
|
+ this.handlerCount = handlerCount;
|
|
|
|
+ this.configuration = configuration;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Parameterized.Parameters
|
|
|
|
+ public static Collection primes() {
|
|
|
|
+ return Arrays.asList(new Object[][]{
|
|
|
|
+ {
|
|
|
|
+ // Test StaticRouterRpcFairnessPolicyController.
|
|
|
|
+ StaticRouterRpcFairnessPolicyController.class,
|
|
|
|
+ 3,
|
|
|
|
+ setConfiguration(null),
|
|
|
|
+ expectedHandlerPerNs("ns0:1, ns1:1, concurrent:1")
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ // Test ProportionRouterRpcFairnessPolicyController.
|
|
|
|
+ ProportionRouterRpcFairnessPolicyController.class,
|
|
|
|
+ 20,
|
|
|
|
+ setConfiguration(
|
|
|
|
+ "dfs.federation.router.fairness.handler.proportion.ns0=0.5, " +
|
|
|
|
+ "dfs.federation.router.fairness.handler.proportion.ns1=0.8, " +
|
|
|
|
+ "dfs.federation.router.fairness.handler.proportion.concurrent=1"
|
|
|
|
+ ),
|
|
|
|
+ expectedHandlerPerNs("ns0:10, ns1:16, concurrent:20")
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
|
|
@After
|
|
@After
|
|
public void cleanup() {
|
|
public void cleanup() {
|
|
@@ -69,6 +123,7 @@ public class TestRouterHandlersFairness {
|
|
|
|
|
|
private void setupCluster(boolean fairnessEnable, boolean ha)
|
|
private void setupCluster(boolean fairnessEnable, boolean ha)
|
|
throws Exception {
|
|
throws Exception {
|
|
|
|
+ LOG.info("Test {}", policyControllerClass.getSimpleName());
|
|
// Build and start a federated cluster
|
|
// Build and start a federated cluster
|
|
cluster = new StateStoreDFSCluster(ha, 2);
|
|
cluster = new StateStoreDFSCluster(ha, 2);
|
|
Configuration routerConf = new RouterConfigBuilder()
|
|
Configuration routerConf = new RouterConfigBuilder()
|
|
@@ -80,13 +135,17 @@ public class TestRouterHandlersFairness {
|
|
if (fairnessEnable) {
|
|
if (fairnessEnable) {
|
|
routerConf.setClass(
|
|
routerConf.setClass(
|
|
RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
|
|
RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
|
|
- StaticRouterRpcFairnessPolicyController.class,
|
|
|
|
|
|
+ this.policyControllerClass,
|
|
RouterRpcFairnessPolicyController.class);
|
|
RouterRpcFairnessPolicyController.class);
|
|
}
|
|
}
|
|
|
|
|
|
- // With two name services configured, each nameservice has 1 permit and
|
|
|
|
- // fan-out calls have 1 permit.
|
|
|
|
- routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 3);
|
|
|
|
|
|
+ routerConf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 10, TimeUnit.MILLISECONDS);
|
|
|
|
+
|
|
|
|
+ routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, this.handlerCount);
|
|
|
|
+
|
|
|
|
+ for(Map.Entry<String, String> conf : configuration.entrySet()) {
|
|
|
|
+ routerConf.set(conf.getKey(), conf.getValue());
|
|
|
|
+ }
|
|
|
|
|
|
// Datanodes not needed for this test.
|
|
// Datanodes not needed for this test.
|
|
cluster.setNumDatanodesPerNameservice(0);
|
|
cluster.setNumDatanodesPerNameservice(0);
|
|
@@ -191,15 +250,19 @@ public class TestRouterHandlersFairness {
|
|
if (isConcurrent) {
|
|
if (isConcurrent) {
|
|
LOG.info("Taking fanout lock first");
|
|
LOG.info("Taking fanout lock first");
|
|
// take the lock for concurrent NS to block fanout calls
|
|
// take the lock for concurrent NS to block fanout calls
|
|
- assertTrue(routerContext.getRouter().getRpcServer()
|
|
|
|
- .getRPCClient().getRouterRpcFairnessPolicyController()
|
|
|
|
- .acquirePermit(RouterRpcFairnessConstants.CONCURRENT_NS));
|
|
|
|
|
|
+ for(int i = 0; i < expectedHandlerPerNs.get(CONCURRENT_NS); i++) {
|
|
|
|
+ assertTrue(routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCClient().getRouterRpcFairnessPolicyController()
|
|
|
|
+ .acquirePermit(CONCURRENT_NS));
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
for (String ns : cluster.getNameservices()) {
|
|
for (String ns : cluster.getNameservices()) {
|
|
LOG.info("Taking lock first for ns: {}", ns);
|
|
LOG.info("Taking lock first for ns: {}", ns);
|
|
- assertTrue(routerContext.getRouter().getRpcServer()
|
|
|
|
- .getRPCClient().getRouterRpcFairnessPolicyController()
|
|
|
|
- .acquirePermit(ns));
|
|
|
|
|
|
+ for(int i = 0; i < expectedHandlerPerNs.get(ns); i++) {
|
|
|
|
+ assertTrue(routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCClient().getRouterRpcFairnessPolicyController()
|
|
|
|
+ .acquirePermit(ns));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -217,14 +280,18 @@ public class TestRouterHandlersFairness {
|
|
if (isConcurrent) {
|
|
if (isConcurrent) {
|
|
LOG.info("Release fanout lock that was taken before test");
|
|
LOG.info("Release fanout lock that was taken before test");
|
|
// take the lock for concurrent NS to block fanout calls
|
|
// take the lock for concurrent NS to block fanout calls
|
|
- routerContext.getRouter().getRpcServer()
|
|
|
|
- .getRPCClient().getRouterRpcFairnessPolicyController()
|
|
|
|
- .releasePermit(RouterRpcFairnessConstants.CONCURRENT_NS);
|
|
|
|
- } else {
|
|
|
|
- for (String ns : cluster.getNameservices()) {
|
|
|
|
|
|
+ for(int i = 0; i < expectedHandlerPerNs.get(CONCURRENT_NS); i++) {
|
|
routerContext.getRouter().getRpcServer()
|
|
routerContext.getRouter().getRpcServer()
|
|
.getRPCClient().getRouterRpcFairnessPolicyController()
|
|
.getRPCClient().getRouterRpcFairnessPolicyController()
|
|
- .releasePermit(ns);
|
|
|
|
|
|
+ .releasePermit(CONCURRENT_NS);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ for (String ns : cluster.getNameservices()) {
|
|
|
|
+ for(int i = 0; i < expectedHandlerPerNs.get(ns); i++) {
|
|
|
|
+ routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCClient().getRouterRpcFairnessPolicyController()
|
|
|
|
+ .releasePermit(ns);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
@@ -260,7 +327,7 @@ public class TestRouterHandlersFairness {
|
|
.getRejectedPermitForNs(ns);
|
|
.getRejectedPermitForNs(ns);
|
|
}
|
|
}
|
|
totalRejectedPermits += routerContext.getRouterRpcClient()
|
|
totalRejectedPermits += routerContext.getRouterRpcClient()
|
|
- .getRejectedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS);
|
|
|
|
|
|
+ .getRejectedPermitForNs(CONCURRENT_NS);
|
|
return totalRejectedPermits;
|
|
return totalRejectedPermits;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -271,7 +338,7 @@ public class TestRouterHandlersFairness {
|
|
.getAcceptedPermitForNs(ns);
|
|
.getAcceptedPermitForNs(ns);
|
|
}
|
|
}
|
|
totalAcceptedPermits += routerContext.getRouterRpcClient()
|
|
totalAcceptedPermits += routerContext.getRouterRpcClient()
|
|
- .getAcceptedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS);
|
|
|
|
|
|
+ .getAcceptedPermitForNs(CONCURRENT_NS);
|
|
return totalAcceptedPermits;
|
|
return totalAcceptedPermits;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -308,4 +375,36 @@ public class TestRouterHandlersFairness {
|
|
overloadException.get();
|
|
overloadException.get();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static Map<String, Integer> expectedHandlerPerNs(String str) {
|
|
|
|
+ Map<String, Integer> handlersPerNsMap = new HashMap<>();
|
|
|
|
+ if (str == null) {
|
|
|
|
+ return handlersPerNsMap;
|
|
|
|
+ }
|
|
|
|
+ String[] tmpStrs = str.split(", ");
|
|
|
|
+ for(String tmpStr : tmpStrs) {
|
|
|
|
+ String[] handlersPerNs = tmpStr.split(":");
|
|
|
|
+ if (handlersPerNs.length != 2) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ handlersPerNsMap.put(handlersPerNs[0], Integer.valueOf(handlersPerNs[1]));
|
|
|
|
+ }
|
|
|
|
+ return handlersPerNsMap;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static Map<String, String> setConfiguration(String str) {
|
|
|
|
+ Map<String, String> conf = new HashMap<>();
|
|
|
|
+ if (str == null) {
|
|
|
|
+ return conf;
|
|
|
|
+ }
|
|
|
|
+ String[] tmpStrs = str.split(", ");
|
|
|
|
+ for(String tmpStr : tmpStrs) {
|
|
|
|
+ String[] configKV = tmpStr.split("=");
|
|
|
|
+ if (configKV.length != 2) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ conf.put(configKV[0], configKV[1]);
|
|
|
|
+ }
|
|
|
|
+ return conf;
|
|
|
|
+ }
|
|
}
|
|
}
|