Explorar o código

ZOOKEEPER-719. Add throttling to BookKeeper client

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@962693 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed %!s(int64=15) %!d(string=hai) anos
pai
achega
8953aabcb9

+ 2 - 0
CHANGES.txt

@@ -64,6 +64,8 @@ BUGFIXES:
   ZOOKEEPER-796. zkServer.sh should support an external PIDFILE variable
   ZOOKEEPER-796. zkServer.sh should support an external PIDFILE variable
   (Alex Newman via phunt)
   (Alex Newman via phunt)
 
 
+  ZOOKEEPER-719. Add throttling to BookKeeper client (fpj via breed)
+
 IMPROVEMENTS:
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)
   (phunt via mahadev)

+ 12 - 1
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java

@@ -70,6 +70,8 @@ public abstract class BKException extends Exception {
             return new BKWriteException();
             return new BKWriteException();
         case Code.NoSuchEntryException:
         case Code.NoSuchEntryException:
             return new BKNoSuchEntryException();
             return new BKNoSuchEntryException();
+        case Code.IncorrectParameterException:
+            return new BKIncorrectParameterException();
         default:
         default:
             return new BKIllegalOpException();
             return new BKIllegalOpException();
         }
         }
@@ -94,7 +96,8 @@ public abstract class BKException extends Exception {
         int LedgerClosedException = -11;
         int LedgerClosedException = -11;
         int WriteException = -12;
         int WriteException = -12;
         int NoSuchEntryException = -13;
         int NoSuchEntryException = -13;
-
+        int IncorrectParameterException = -14;
+        
         int IllegalOpException = -100;
         int IllegalOpException = -100;
     }
     }
 
 
@@ -136,6 +139,8 @@ public abstract class BKException extends Exception {
             return "Write failed on bookie";
             return "Write failed on bookie";
         case Code.NoSuchEntryException:
         case Code.NoSuchEntryException:
             return "No such entry";
             return "No such entry";
+        case Code.IncorrectParameterException:
+            return "Incorrect parameter input";
         default:
         default:
             return "Invalid operation";
             return "Invalid operation";
         }
         }
@@ -224,4 +229,10 @@ public abstract class BKException extends Exception {
             super(Code.LedgerClosedException);
             super(Code.LedgerClosedException);
         }
         }
     }
     }
+    
+    public static class BKIncorrectParameterException extends BKException {
+        public BKIncorrectParameterException() {
+            super(Code.IncorrectParameterException);
+        }
+    }
 }
 }

+ 4 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java

@@ -145,6 +145,10 @@ class LedgerCreateOp implements StringCallback, StatCallback {
             LOG.error("Security exception while creating ledger: " + ledgerId, e);
             LOG.error("Security exception while creating ledger: " + ledgerId, e);
             cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
             cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
             return;
             return;
+        } catch (NumberFormatException e) {
+            LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"), e);
+            cb.createComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
+            return;
         }
         }
 
 
         lh.writeLedgerConfig(this, null);
         lh.writeLedgerConfig(this, null);

+ 22 - 8
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java

@@ -27,6 +27,8 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Enumeration;
 import java.util.Queue;
 import java.util.Queue;
+import java.util.concurrent.Semaphore;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -61,10 +63,14 @@ public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
   final DigestManager macManager;
   final DigestManager macManager;
   final DistributionSchedule distributionSchedule;
   final DistributionSchedule distributionSchedule;
 
 
+  final Semaphore opCounterSem;
+  private Integer throttling = 5000;
+  
   final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
   final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
 
 
   LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
   LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
-      DigestType digestType, byte[] password) throws GeneralSecurityException {
+      DigestType digestType, byte[] password)
+      throws GeneralSecurityException, NumberFormatException {
     this.bk = bk;
     this.bk = bk;
     this.metadata = metadata;
     this.metadata = metadata;
     if (metadata.isClosed()) {
     if (metadata.isClosed()) {
@@ -72,14 +78,21 @@ public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
     } else {
     } else {
       lastAddConfirmed = lastAddPushed = -1;
       lastAddConfirmed = lastAddPushed = -1;
     }
     }
-
+    
     this.ledgerId = ledgerId;
     this.ledgerId = ledgerId;
+    
+    String throttleValue = System.getProperty("throttle");
+    if(throttleValue != null){
+        this.throttling = new Integer(throttleValue); 
+    }
+    this.opCounterSem = new Semaphore(throttling);
+    
     macManager = DigestManager.instantiate(ledgerId, password, digestType);
     macManager = DigestManager.instantiate(ledgerId, password, digestType);
     this.ledgerKey = MacDigestManager.genDigest("ledger", password);
     this.ledgerKey = MacDigestManager.genDigest("ledger", password);
     distributionSchedule = new RoundRobinDistributionSchedule(
     distributionSchedule = new RoundRobinDistributionSchedule(
         metadata.quorumSize, metadata.ensembleSize);
         metadata.quorumSize, metadata.ensembleSize);
   }
   }
-
+  
   /**
   /**
    * Get the id of the current ledger
    * Get the id of the current ledger
    * 
    * 
@@ -219,7 +232,7 @@ public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
    *          control object
    *          control object
    */
    */
   public void asyncReadEntries(long firstEntry, long lastEntry,
   public void asyncReadEntries(long firstEntry, long lastEntry,
-      ReadCallback cb, Object ctx) {
+      ReadCallback cb, Object ctx) throws InterruptedException {
     // Little sanity check
     // Little sanity check
     if (firstEntry < 0 || lastEntry > lastAddConfirmed
     if (firstEntry < 0 || lastEntry > lastAddConfirmed
         || firstEntry > lastEntry) {
         || firstEntry > lastEntry) {
@@ -228,7 +241,7 @@ public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
     }
     }
 
 
     new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
     new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
-
+    opCounterSem.acquire();
   }
   }
 
 
   /**
   /**
@@ -260,8 +273,8 @@ public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
    *          some control object
    *          some control object
    */
    */
   public void asyncAddEntry(final byte[] data, final AddCallback cb,
   public void asyncAddEntry(final byte[] data, final AddCallback cb,
-      final Object ctx) {
-    bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+      final Object ctx) throws InterruptedException {
+      bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
       @Override
       @Override
       public void safeRun() {
       public void safeRun() {
         if (metadata.isClosed()) {
         if (metadata.isClosed()) {
@@ -279,7 +292,8 @@ public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
         op.initiate(toSend);
         op.initiate(toSend);
 
 
       }
       }
-    });
+      });
+      opCounterSem.acquire();
   }
   }
 
 
   // close the ledger and send fails to all the adds in the pipeline
   // close the ledger and send fails to all the adds in the pipeline

+ 4 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java

@@ -114,6 +114,10 @@ class LedgerOpenOp implements DataCallback {
             LOG.error("Security exception while opening ledger: " + ledgerId, e);
             LOG.error("Security exception while opening ledger: " + ledgerId, e);
             cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
             cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
             return;
             return;
+        } catch (NumberFormatException e) {
+            LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"), e);
+            cb.openComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
+            return;
         }
         }
 
 
         if (metadata.close != LedgerMetadata.NOTCLOSED) {
         if (metadata.close != LedgerMetadata.NOTCLOSED) {

+ 13 - 4
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java

@@ -117,9 +117,13 @@ class LedgerRecoveryOp implements ReadEntryCallback, ReadCallback, AddCallback {
      * Try to read past the last confirmed.
      * Try to read past the last confirmed.
      */
      */
     private void doRecoveryRead() {
     private void doRecoveryRead() {
-        lh.lastAddConfirmed++;
-        lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
-
+        try{
+            lh.lastAddConfirmed++;
+            lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted while trying to read entry.", e);
+            Thread.currentThread().interrupt();
+        }
     }
     }
 
 
     @Override
     @Override
@@ -127,7 +131,12 @@ class LedgerRecoveryOp implements ReadEntryCallback, ReadCallback, AddCallback {
         // get back to prev value
         // get back to prev value
         lh.lastAddConfirmed--;
         lh.lastAddConfirmed--;
         if (rc == BKException.Code.OK) {
         if (rc == BKException.Code.OK) {
-            lh.asyncAddEntry(seq.nextElement().getEntry(), this, null);
+            try{
+                lh.asyncAddEntry(seq.nextElement().getEntry(), this, null);
+            } catch (InterruptedException e) {
+                LOG.error("Interrupted while adding entry.", e);
+                Thread.currentThread().interrupt();
+            }
             return;
             return;
         }
         }
 
 

+ 1 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java

@@ -132,6 +132,7 @@ class PendingAddOp implements WriteCallback {
 
 
     void submitCallback(final int rc) {
     void submitCallback(final int rc) {
         cb.addComplete(rc, lh, entryId, ctx);
         cb.addComplete(rc, lh, entryId, ctx);
+        lh.opCounterSem.release();
     }
     }
 
 
 }
 }

+ 6 - 2
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java

@@ -87,7 +87,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         if (entry.nextReplicaIndexToReadFrom >= lh.metadata.quorumSize) {
         if (entry.nextReplicaIndexToReadFrom >= lh.metadata.quorumSize) {
             // we are done, the read has failed from all replicas, just fail the
             // we are done, the read has failed from all replicas, just fail the
             // read
             // read
-            cb.readComplete(lastErrorCode, lh, null, ctx);
+            submitCallback(lastErrorCode);
             return;
             return;
         }
         }
 
 
@@ -126,11 +126,15 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         entry.entryDataStream = is;
         entry.entryDataStream = is;
 
 
         if (numPendingReads == 0) {
         if (numPendingReads == 0) {
-            cb.readComplete(BKException.Code.OK, lh, PendingReadOp.this, PendingReadOp.this.ctx);
+            submitCallback(BKException.Code.OK);
         }
         }
 
 
     }
     }
 
 
+    private void submitCallback(int code){
+        cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
+        lh.opCounterSem.release();
+    }
     public boolean hasMoreElements() {
     public boolean hasMoreElements() {
         return !seq.isEmpty();
         return !seq.isEmpty();
     }
     }

+ 3 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException;
@@ -69,6 +70,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
 
 
     InetSocketAddress addr;
     InetSocketAddress addr;
     boolean connected = false;
     boolean connected = false;
+    Semaphore opCounterSem = new Semaphore(2000);
     AtomicLong totalBytesOutstanding;
     AtomicLong totalBytesOutstanding;
     ClientSocketChannelFactory channelFactory;
     ClientSocketChannelFactory channelFactory;
     OrderedSafeExecutor executor;
     OrderedSafeExecutor executor;
@@ -206,6 +208,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             Object ctx) {
             Object ctx) {
 
 
         final int entrySize = toSend.readableBytes();
         final int entrySize = toSend.readableBytes();
+        
         // if (totalBytesOutstanding.get() > maxMemory) {
         // if (totalBytesOutstanding.get() > maxMemory) {
         // // TODO: how to throttle, throw an exception, or call the callback?
         // // TODO: how to throttle, throw an exception, or call the callback?
         // // Maybe this should be done at the layer above?
         // // Maybe this should be done at the layer above?

+ 95 - 0
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java

@@ -237,6 +237,101 @@ public class BookieReadWriteTest extends BaseTestCase implements AddCallback, Re
         }
         }
     }
     }
 
 
+    @Test
+    public void testReadWriteAsyncSingleClientThrottle() throws IOException {
+        try {
+            // Create a BookKeeper client and a ledger
+            System.setProperty("throttle", "1000");
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(digestType, ledgerPassword);
+            // bkc.initMessageDigest("SHA1");
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            
+            numEntriesToWrite = 20000; 
+            for (int i = 0; i < (numEntriesToWrite - 10000); i++) {
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+
+                entries.add(entry.array());
+                entriesSize.add(entry.array().length);
+                lh.asyncAddEntry(entry.array(), this, sync);
+            }
+            
+
+            for (int i = 0; i < 10000; i++) {
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+
+                entries.add(entry.array());
+                entriesSize.add(entry.array().length);
+                lh.asyncAddEntry(entry.array(), this, sync);
+            }
+            
+            // wait for all entries to be acknowledged
+            synchronized (sync) {
+                while (sync.counter < numEntriesToWrite) {
+                    LOG.debug("Entries counter = " + sync.counter);
+                    sync.wait();
+                }
+            }
+
+            LOG.debug("*** WRITE COMPLETE ***");
+            // close ledger
+            lh.close();
+
+            // *** WRITING PART COMPLETE // READ PART BEGINS ***
+            
+            // open ledger
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+            assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+            // read entries
+            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+
+            synchronized (sync) {
+                while (sync.value == false) {
+                    sync.wait();
+                }
+            }
+
+            LOG.debug("*** READ COMPLETE ***");
+
+            // at this point, LedgerSequence ls is filled with the returned
+            // values
+            int i = 0;
+            while (ls.hasMoreElements()) {
+                ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+                Integer origEntry = origbb.getInt();
+                byte[] entry = ls.nextElement().getEntry();
+                ByteBuffer result = ByteBuffer.wrap(entry);
+                LOG.debug("Length of result: " + result.capacity());
+                LOG.debug("Original entry: " + origEntry);
+
+                Integer retrEntry = result.getInt();
+                LOG.debug("Retrieved entry: " + retrEntry);
+                assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+                assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+                i++;
+            }
+            assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
+            lh.close();
+        } catch (KeeperException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to ZooKeeper exception");
+        } catch (BKException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
+        } catch (InterruptedException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        }
+    }
+    
     @Test
     @Test
     public void testSyncReadAsyncWriteStringsSingleClient() throws IOException {
     public void testSyncReadAsyncWriteStringsSingleClient() throws IOException {
         LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
         LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");