|
@@ -67,11 +67,17 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
|
|
|
CountDownLatch disconnected = null;
|
|
|
|
|
|
+ CountDownLatch throttled = null;
|
|
|
+ CountDownLatch throttling = null;
|
|
|
+
|
|
|
ZooKeeperServer zks = null;
|
|
|
ServerCnxnFactory f = null;
|
|
|
ZooKeeper zk = null;
|
|
|
int connectionLossCount = 0;
|
|
|
|
|
|
+ private long getCounterMetric(String name) {
|
|
|
+ return (long) MetricsUtils.currentServerMetrics().get(name);
|
|
|
+ }
|
|
|
|
|
|
@BeforeEach
|
|
|
public void setup() throws Exception {
|
|
@@ -115,6 +121,11 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
super(snapDir, logDir, tickTime);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected RequestThrottler createRequestThrottler() {
|
|
|
+ return new TestRequestThrottler(this);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected void setupRequestProcessors() {
|
|
|
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
|
|
@@ -141,6 +152,24 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ class TestRequestThrottler extends RequestThrottler {
|
|
|
+ public TestRequestThrottler(ZooKeeperServer zks) {
|
|
|
+ super(zks);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ synchronized void throttleSleep(int stallTime) throws InterruptedException {
|
|
|
+ if (throttling != null) {
|
|
|
+ throttling.countDown();
|
|
|
+ }
|
|
|
+ super.throttleSleep(stallTime);
|
|
|
+ // Defend against unstable timing and potential spurious wakeup.
|
|
|
+ if (throttled != null) {
|
|
|
+ assertTrue(throttled.await(20, TimeUnit.SECONDS));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
class TestPrepRequestProcessor extends PrepRequestProcessor {
|
|
|
|
|
|
public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) {
|
|
@@ -191,18 +220,22 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
|
|
|
// make sure the server received all 5 requests
|
|
|
submitted.await(5, TimeUnit.SECONDS);
|
|
|
- Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
|
|
|
|
|
|
// but only two requests can get into the pipeline because of the throttler
|
|
|
- assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));
|
|
|
- assertEquals(1L, (long) metrics.get("request_throttle_wait_count"));
|
|
|
+ WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2;
|
|
|
+ waitFor("request not queued", requestQueued, 5);
|
|
|
+
|
|
|
+ WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1;
|
|
|
+ waitFor("no throttle wait", throttleWait, 5);
|
|
|
|
|
|
// let the requests go through the pipeline and the throttler will be waken up to allow more requests
|
|
|
// to enter the pipeline
|
|
|
resumeProcess.countDown();
|
|
|
- entered.await(STALL_TIME, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
- metrics = MetricsUtils.currentServerMetrics();
|
|
|
+ // wait for more than one STALL_TIME to reduce timeout before wakeup
|
|
|
+ assertTrue(entered.await(STALL_TIME + 5000, TimeUnit.MILLISECONDS));
|
|
|
+
|
|
|
+ Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
|
|
|
assertEquals(TOTAL_REQUESTS, (long) metrics.get("prep_processor_request_queued"));
|
|
|
}
|
|
|
|
|
@@ -221,6 +254,9 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
resumeProcess = new CountDownLatch(1);
|
|
|
submitted = new CountDownLatch(TOTAL_REQUESTS);
|
|
|
|
|
|
+ throttled = new CountDownLatch(1);
|
|
|
+ throttling = new CountDownLatch(1);
|
|
|
+
|
|
|
// send 5 requests asynchronously
|
|
|
for (int i = 0; i < TOTAL_REQUESTS; i++) {
|
|
|
zk.create("/request_throttle_test- " + i, ("/request_throttle_test- "
|
|
@@ -231,11 +267,18 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
// make sure the server received all 5 requests
|
|
|
assertTrue(submitted.await(5, TimeUnit.SECONDS));
|
|
|
|
|
|
+ // stale throttled requests
|
|
|
+ assertTrue(throttling.await(5, TimeUnit.SECONDS));
|
|
|
for (ServerCnxn cnxn : f.cnxns) {
|
|
|
cnxn.setStale();
|
|
|
}
|
|
|
+ throttled.countDown();
|
|
|
zk = null;
|
|
|
|
|
|
+ // only first three requests are counted as finished
|
|
|
+ finished = new CountDownLatch(3);
|
|
|
+
|
|
|
+ // let the requests go through the pipeline
|
|
|
resumeProcess.countDown();
|
|
|
LOG.info("raise the latch");
|
|
|
|
|
@@ -243,6 +286,8 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
Thread.sleep(50);
|
|
|
}
|
|
|
|
|
|
+ assertTrue(finished.await(5, TimeUnit.SECONDS));
|
|
|
+
|
|
|
// assert after all requests processed to avoid concurrent issues as metrics are
|
|
|
// counted in different threads.
|
|
|
Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
|
|
@@ -327,7 +372,6 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
RequestThrottler.setMaxRequests(0);
|
|
|
resumeProcess = new CountDownLatch(1);
|
|
|
int totalRequests = 10;
|
|
|
- submitted = new CountDownLatch(totalRequests);
|
|
|
|
|
|
for (int i = 0; i < totalRequests; i++) {
|
|
|
zk.create("/request_throttle_test- " + i, ("/request_throttle_test- "
|
|
@@ -335,16 +379,16 @@ public class RequestThrottlerTest extends ZKTestCase {
|
|
|
}, null);
|
|
|
}
|
|
|
|
|
|
- submitted.await(5, TimeUnit.SECONDS);
|
|
|
-
|
|
|
// We should start throttling instead of queuing more requests.
|
|
|
//
|
|
|
// We always allow up to GLOBAL_OUTSTANDING_LIMIT + 1 number of requests coming in request processing pipeline
|
|
|
// before throttling. For the next request, we will throttle by disabling receiving future requests but we still
|
|
|
- // allow this single request coming in. So the total number of queued requests in processing pipeline would
|
|
|
+ // allow this single request coming in. Ideally, the total number of queued requests in processing pipeline would
|
|
|
// be GLOBAL_OUTSTANDING_LIMIT + 2.
|
|
|
- assertEquals(Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2,
|
|
|
- (long) MetricsUtils.currentServerMetrics().get("prep_processor_request_queued"));
|
|
|
+ //
|
|
|
+ // But due to leak of consistent view of number of outstanding requests, the number could be larger.
|
|
|
+ WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2;
|
|
|
+ waitFor("no enough requests queued", requestQueued, 5);
|
|
|
|
|
|
resumeProcess.countDown();
|
|
|
} catch (Exception e) {
|