|
@@ -229,12 +229,7 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
}
|
|
|
|
|
|
// make sure the server received all 5 requests
|
|
|
- submitted.await(5, TimeUnit.SECONDS);
|
|
|
- Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
|
|
|
-
|
|
|
- // but only two requests can get into the pipeline because of the throttler
|
|
|
- assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));
|
|
|
- assertEquals(1L, (long) metrics.get("request_throttle_wait_count"));
|
|
|
+ assertTrue(submitted.await(5, TimeUnit.SECONDS));
|
|
|
|
|
|
for (ServerCnxn cnxn : f.cnxns) {
|
|
|
cnxn.setStale();
|
|
@@ -248,10 +243,16 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
Thread.sleep(50);
|
|
|
}
|
|
|
|
|
|
+ // assert after all requests processed to avoid concurrent issues as metrics are
|
|
|
+ // counted in different threads.
|
|
|
+ Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
|
|
|
+
|
|
|
+ // only two requests can get into the pipeline because of the throttler
|
|
|
+ assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));
|
|
|
+
|
|
|
// the rest of the 3 requests will be dropped
|
|
|
// but only the first one for a connection will be counted
|
|
|
- metrics = MetricsUtils.currentServerMetrics();
|
|
|
- assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));
|
|
|
+ assertEquals(1L, (long) metrics.get("request_throttle_wait_count"));
|
|
|
assertEquals(1, (long) metrics.get("stale_requests_dropped"));
|
|
|
}
|
|
|
|
|
@@ -261,13 +262,22 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
|
|
|
AsyncCallback.StringCallback createCallback = (rc, path, ctx, name) -> {
|
|
|
if (KeeperException.Code.get(rc) == KeeperException.Code.CONNECTIONLOSS) {
|
|
|
- disconnected.countDown();
|
|
|
connectionLossCount++;
|
|
|
+ disconnected.countDown();
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- // we allow five requests in the pipeline
|
|
|
- RequestThrottler.setMaxRequests(5);
|
|
|
+ // the total length of the request is about 170-180 bytes, so only two requests are allowed
|
|
|
+ byte[] data = new byte[100];
|
|
|
+ // the third request will incur throttle. We don't send more requests to avoid reconnecting
|
|
|
+ // due to unstable test environment(e.g. slow sending).
|
|
|
+ int number_requests = 3;
|
|
|
+
|
|
|
+ // we allow more requests in the pipeline
|
|
|
+ RequestThrottler.setMaxRequests(number_requests + 2);
|
|
|
+
|
|
|
+ // request could become stale in processor threads due to throttle in io thread
|
|
|
+ RequestThrottler.setDropStaleRequests(false);
|
|
|
|
|
|
// enable large request throttling
|
|
|
zks.setLargeRequestThreshold(150);
|
|
@@ -277,34 +287,32 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
resumeProcess = new CountDownLatch(1);
|
|
|
// the connection will be close when large requests exceed the limit
|
|
|
// we can't use the submitted latch because requests after close won't be submitted
|
|
|
- disconnected = new CountDownLatch(TOTAL_REQUESTS);
|
|
|
-
|
|
|
- // the total length of the request is about 170-180 bytes, so only two requests are allowed
|
|
|
- byte[] data = new byte[100];
|
|
|
+ disconnected = new CountDownLatch(number_requests);
|
|
|
|
|
|
- // send 5 requests asynchronously
|
|
|
- for (int i = 0; i < TOTAL_REQUESTS; i++) {
|
|
|
+ // send requests asynchronously
|
|
|
+ for (int i = 0; i < number_requests; i++) {
|
|
|
zk.create("/request_throttle_test- " + i , data,
|
|
|
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createCallback, null);
|
|
|
}
|
|
|
|
|
|
- // make sure the server received all 5 requests
|
|
|
- disconnected.await(30, TimeUnit.SECONDS);
|
|
|
+ // make sure the server received all requests
|
|
|
+ assertTrue(disconnected.await(30, TimeUnit.SECONDS));
|
|
|
+
|
|
|
+ finished = new CountDownLatch(2);
|
|
|
+ // let the requests go through the pipeline
|
|
|
+ resumeProcess.countDown();
|
|
|
+ assertTrue(finished.await(5, TimeUnit.SECONDS));
|
|
|
+
|
|
|
+ // assert metrics after finished so metrics in no io threads are set also.
|
|
|
Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
|
|
|
|
|
|
// but only two requests can get into the pipeline because they are large requests
|
|
|
// the connection will be closed
|
|
|
assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));
|
|
|
assertEquals(1L, (long) metrics.get("large_requests_rejected"));
|
|
|
- assertEquals(5, connectionLossCount);
|
|
|
-
|
|
|
- finished = new CountDownLatch(2);
|
|
|
- // let the requests go through the pipeline
|
|
|
- resumeProcess.countDown();
|
|
|
- finished.await(5, TimeUnit.SECONDS);
|
|
|
+ assertEquals(number_requests, connectionLossCount);
|
|
|
|
|
|
// when the two requests finish, they are stale because the connection is closed already
|
|
|
- metrics = MetricsUtils.currentServerMetrics();
|
|
|
assertEquals(2, (long) metrics.get("stale_replies"));
|
|
|
}
|
|
|
|