|
@@ -23,12 +23,17 @@ package org.apache.bookkeeper.test;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.NoSuchFieldException;
|
|
|
+import java.lang.IllegalAccessException;
|
|
|
+import java.lang.reflect.Field;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.charset.Charset;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Enumeration;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
+
|
|
|
|
|
|
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
|
|
|
import org.apache.bookkeeper.client.BKException;
|
|
@@ -83,7 +88,7 @@ public class BookieReadWriteTest extends BaseTestCase implements AddCallback, Re
|
|
|
Set<Object> syncObjs;
|
|
|
|
|
|
class SyncObj {
|
|
|
- int counter;
|
|
|
+ volatile int counter;
|
|
|
boolean value;
|
|
|
|
|
|
public SyncObj() {
|
|
@@ -237,19 +242,61 @@ public class BookieReadWriteTest extends BaseTestCase implements AddCallback, Re
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ class ThrottleTestCallback implements ReadCallback {
|
|
|
+ int throttle;
|
|
|
+
|
|
|
+ ThrottleTestCallback(int threshold){
|
|
|
+ this.throttle = threshold;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx){
|
|
|
+ if(rc != BKException.Code.OK){
|
|
|
+ fail("Return code is not OK: " + rc);
|
|
|
+ }
|
|
|
+
|
|
|
+ ls = seq;
|
|
|
+ synchronized(sync){
|
|
|
+ sync.counter += throttle;
|
|
|
+ sync.notify();
|
|
|
+ }
|
|
|
+ LOG.info("Current counter: " + sync.counter);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Method for obtaining the available permits of a ledger handle
|
|
|
+ * using reflection to avoid adding a new public method to the
|
|
|
+ * class.
|
|
|
+ *
|
|
|
+ * @param lh
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ int getAvailablePermits(LedgerHandle lh) throws
|
|
|
+ NoSuchFieldException, IllegalAccessException
|
|
|
+ {
|
|
|
+ Field field = LedgerHandle.class.getDeclaredField("opCounterSem");
|
|
|
+ field.setAccessible(true);
|
|
|
+ return ((Semaphore)field.get(lh)).availablePermits();
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
- public void testReadWriteAsyncSingleClientThrottle() throws IOException {
|
|
|
+ public void testReadWriteAsyncSingleClientThrottle() throws
|
|
|
+ IOException, NoSuchFieldException, IllegalAccessException {
|
|
|
try {
|
|
|
+
|
|
|
+ Integer throttle = 100;
|
|
|
+ ThrottleTestCallback tcb = new ThrottleTestCallback(throttle);
|
|
|
// Create a BookKeeper client and a ledger
|
|
|
- System.setProperty("throttle", "1000");
|
|
|
+ System.setProperty("throttle", throttle.toString());
|
|
|
bkc = new BookKeeper("127.0.0.1");
|
|
|
lh = bkc.createLedger(digestType, ledgerPassword);
|
|
|
// bkc.initMessageDigest("SHA1");
|
|
|
ledgerId = lh.getId();
|
|
|
LOG.info("Ledger ID: " + lh.getId());
|
|
|
|
|
|
- numEntriesToWrite = 20000;
|
|
|
- for (int i = 0; i < (numEntriesToWrite - 10000); i++) {
|
|
|
+ numEntriesToWrite = 8000;
|
|
|
+ for (int i = 0; i < (numEntriesToWrite - 2000); i++) {
|
|
|
ByteBuffer entry = ByteBuffer.allocate(4);
|
|
|
entry.putInt(rng.nextInt(maxInt));
|
|
|
entry.position(0);
|
|
@@ -257,10 +304,15 @@ public class BookieReadWriteTest extends BaseTestCase implements AddCallback, Re
|
|
|
entries.add(entry.array());
|
|
|
entriesSize.add(entry.array().length);
|
|
|
lh.asyncAddEntry(entry.array(), this, sync);
|
|
|
+ /*
|
|
|
+ * Check that the difference is no larger than the throttling threshold
|
|
|
+ */
|
|
|
+ int testValue = getAvailablePermits(lh);
|
|
|
+ assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
|
|
|
}
|
|
|
|
|
|
|
|
|
- for (int i = 0; i < 10000; i++) {
|
|
|
+ for (int i = 0; i < 2000; i++) {
|
|
|
ByteBuffer entry = ByteBuffer.allocate(4);
|
|
|
entry.putInt(rng.nextInt(maxInt));
|
|
|
entry.position(0);
|
|
@@ -268,6 +320,12 @@ public class BookieReadWriteTest extends BaseTestCase implements AddCallback, Re
|
|
|
entries.add(entry.array());
|
|
|
entriesSize.add(entry.array().length);
|
|
|
lh.asyncAddEntry(entry.array(), this, sync);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Check that the difference is no larger than the throttling threshold
|
|
|
+ */
|
|
|
+ int testValue = getAvailablePermits(lh);
|
|
|
+ assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
|
|
|
}
|
|
|
|
|
|
// wait for all entries to be acknowledged
|
|
@@ -290,35 +348,22 @@ public class BookieReadWriteTest extends BaseTestCase implements AddCallback, Re
|
|
|
assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
|
|
|
|
|
|
// read entries
|
|
|
- lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
|
|
|
-
|
|
|
+ sync.counter = 0;
|
|
|
+ for (int i = 0; i < numEntriesToWrite; i+=throttle) {
|
|
|
+ lh.asyncReadEntries(i, i + throttle - 1, tcb, (Object) sync);
|
|
|
+ int testValue = getAvailablePermits(lh);
|
|
|
+ assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
|
|
|
+ }
|
|
|
+
|
|
|
synchronized (sync) {
|
|
|
- while (sync.value == false) {
|
|
|
+ while (sync.counter < numEntriesToWrite) {
|
|
|
+ LOG.info("Entries counter = " + sync.counter);
|
|
|
sync.wait();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
LOG.debug("*** READ COMPLETE ***");
|
|
|
|
|
|
- // at this point, LedgerSequence ls is filled with the returned
|
|
|
- // values
|
|
|
- int i = 0;
|
|
|
- while (ls.hasMoreElements()) {
|
|
|
- ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
|
|
|
- Integer origEntry = origbb.getInt();
|
|
|
- byte[] entry = ls.nextElement().getEntry();
|
|
|
- ByteBuffer result = ByteBuffer.wrap(entry);
|
|
|
- LOG.debug("Length of result: " + result.capacity());
|
|
|
- LOG.debug("Original entry: " + origEntry);
|
|
|
-
|
|
|
- Integer retrEntry = result.getInt();
|
|
|
- LOG.debug("Retrieved entry: " + retrEntry);
|
|
|
- assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
|
|
|
- assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
|
|
|
- i++;
|
|
|
- }
|
|
|
- assertTrue("Checking number of read entries", i == numEntriesToWrite);
|
|
|
-
|
|
|
lh.close();
|
|
|
} catch (KeeperException e) {
|
|
|
LOG.error("Test failed", e);
|
|
@@ -565,7 +610,10 @@ public class BookieReadWriteTest extends BaseTestCase implements AddCallback, Re
|
|
|
}
|
|
|
|
|
|
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
|
|
|
+ if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
|
|
|
+
|
|
|
SyncObj x = (SyncObj) ctx;
|
|
|
+
|
|
|
synchronized (x) {
|
|
|
x.counter++;
|
|
|
x.notify();
|
|
@@ -573,12 +621,14 @@ public class BookieReadWriteTest extends BaseTestCase implements AddCallback, Re
|
|
|
}
|
|
|
|
|
|
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
|
|
|
+ if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
|
|
|
+
|
|
|
ls = seq;
|
|
|
+
|
|
|
synchronized (sync) {
|
|
|
sync.value = true;
|
|
|
sync.notify();
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
@Before
|