|
@@ -41,8 +41,7 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
|
- * Test the Router handlers fairness control rejects
|
|
|
- * requests when the handlers are overloaded.
|
|
|
+ * Test the Router handlers fairness control rejects and accepts requests.
|
|
|
*/
|
|
|
public class TestRouterHandlersFairness {
|
|
|
|
|
@@ -126,6 +125,12 @@ public class TestRouterHandlersFairness {
|
|
|
throws Exception {
|
|
|
|
|
|
RouterContext routerContext = cluster.getRandomRouter();
|
|
|
+ URI address = routerContext.getFileSystemURI();
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
+ final int numOps = 10;
|
|
|
+ AtomicInteger overloadException = new AtomicInteger();
|
|
|
+
|
|
|
+ // Test when handlers are overloaded
|
|
|
if (fairness) {
|
|
|
if (isConcurrent) {
|
|
|
LOG.info("Taking fanout lock first");
|
|
@@ -142,42 +147,11 @@ public class TestRouterHandlersFairness {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- URI address = routerContext.getFileSystemURI();
|
|
|
- Configuration conf = new HdfsConfiguration();
|
|
|
- final int numOps = 10;
|
|
|
- final AtomicInteger overloadException = new AtomicInteger();
|
|
|
int originalRejectedPermits = getTotalRejectedPermits(routerContext);
|
|
|
|
|
|
- for (int i = 0; i < numOps; i++) {
|
|
|
- DFSClient routerClient = null;
|
|
|
- try {
|
|
|
- routerClient = new DFSClient(address, conf);
|
|
|
- String clientName = routerClient.getClientName();
|
|
|
- ClientProtocol routerProto = routerClient.getNamenode();
|
|
|
- if (isConcurrent) {
|
|
|
- invokeConcurrent(routerProto, clientName);
|
|
|
- } else {
|
|
|
- invokeSequential(routerProto);
|
|
|
- }
|
|
|
- } catch (RemoteException re) {
|
|
|
- IOException ioe = re.unwrapRemoteException();
|
|
|
- assertTrue("Wrong exception: " + ioe,
|
|
|
- ioe instanceof StandbyException);
|
|
|
- assertExceptionContains("is overloaded for NS", ioe);
|
|
|
- overloadException.incrementAndGet();
|
|
|
- } catch (Throwable e) {
|
|
|
- throw e;
|
|
|
- } finally {
|
|
|
- if (routerClient != null) {
|
|
|
- try {
|
|
|
- routerClient.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Cannot close the client");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- overloadException.get();
|
|
|
- }
|
|
|
+ // |- All calls should fail since permits not released
|
|
|
+ innerCalls(address, numOps, isConcurrent, conf, overloadException);
|
|
|
+
|
|
|
int latestRejectedPermits = getTotalRejectedPermits(routerContext);
|
|
|
assertEquals(latestRejectedPermits - originalRejectedPermits,
|
|
|
overloadException.get());
|
|
@@ -201,6 +175,17 @@ public class TestRouterHandlersFairness {
|
|
|
assertEquals("Number of failed RPCs without fairness configured",
|
|
|
0, overloadException.get());
|
|
|
}
|
|
|
+
|
|
|
+ // Test when handlers are not overloaded
|
|
|
+ int originalAcceptedPermits = getTotalAcceptedPermits(routerContext);
|
|
|
+ overloadException = new AtomicInteger();
|
|
|
+
|
|
|
+ // |- All calls should succeed since permits not acquired
|
|
|
+ innerCalls(address, numOps, isConcurrent, conf, overloadException);
|
|
|
+
|
|
|
+ int latestAcceptedPermits = getTotalAcceptedPermits(routerContext);
|
|
|
+ assertEquals(latestAcceptedPermits - originalAcceptedPermits, numOps);
|
|
|
+ assertEquals(overloadException.get(), 0);
|
|
|
}
|
|
|
|
|
|
private void invokeSequential(ClientProtocol routerProto) throws IOException {
|
|
@@ -222,4 +207,49 @@ public class TestRouterHandlersFairness {
|
|
|
.getRejectedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS);
|
|
|
return totalRejectedPermits;
|
|
|
}
|
|
|
+
|
|
|
+ private int getTotalAcceptedPermits(RouterContext routerContext) {
|
|
|
+ int totalAcceptedPermits = 0;
|
|
|
+ for (String ns : cluster.getNameservices()) {
|
|
|
+ totalAcceptedPermits += routerContext.getRouterRpcClient()
|
|
|
+ .getAcceptedPermitForNs(ns);
|
|
|
+ }
|
|
|
+ totalAcceptedPermits += routerContext.getRouterRpcClient()
|
|
|
+ .getAcceptedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS);
|
|
|
+ return totalAcceptedPermits;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void innerCalls(URI address, int numOps, boolean isConcurrent,
|
|
|
+ Configuration conf, AtomicInteger overloadException) throws IOException {
|
|
|
+ for (int i = 0; i < numOps; i++) {
|
|
|
+ DFSClient routerClient = null;
|
|
|
+ try {
|
|
|
+ routerClient = new DFSClient(address, conf);
|
|
|
+ String clientName = routerClient.getClientName();
|
|
|
+ ClientProtocol routerProto = routerClient.getNamenode();
|
|
|
+ if (isConcurrent) {
|
|
|
+ invokeConcurrent(routerProto, clientName);
|
|
|
+ } else {
|
|
|
+ invokeSequential(routerProto);
|
|
|
+ }
|
|
|
+ } catch (RemoteException re) {
|
|
|
+ IOException ioe = re.unwrapRemoteException();
|
|
|
+ assertTrue("Wrong exception: " + ioe,
|
|
|
+ ioe instanceof StandbyException);
|
|
|
+ assertExceptionContains("is overloaded for NS", ioe);
|
|
|
+ overloadException.incrementAndGet();
|
|
|
+ } catch (Throwable e) {
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ if (routerClient != null) {
|
|
|
+ try {
|
|
|
+ routerClient.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Cannot close the client");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ overloadException.get();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|