Prechádzať zdrojové kódy

ZOOKEEPER-288. Cleanup and fixes to BookKeeper (flavio via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@761077 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 16 rokov pred
rodič
commit
05eab5e567
17 zmenil súbory, kde vykonal 579 pridanie a 348 odobranie
  1. 2 0
      CHANGES.txt
  2. 30 6
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
  3. 81 0
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BookieException.java
  4. 1 1
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
  5. 19 28
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
  6. 44 51
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
  7. 23 4
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
  8. 0 5
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
  9. 56 13
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
  10. 30 3
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
  11. 71 63
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
  12. 112 135
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
  13. 71 31
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
  14. 7 1
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
  15. 17 1
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
  16. 10 6
      src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
  17. 5 0
      src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java

+ 2 - 0
CHANGES.txt

@@ -59,6 +59,8 @@ mahadev)
 
   ZOOKEEPER-349. to automate patch testing. (giridharan kesavan via mahadev)
 
+  ZOOKEEPER-288. Cleanup and fixes to BookKeeper (flavio via mahadev)
+
 NEW FEATURES:
 
 

+ 30 - 6
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java

@@ -27,10 +27,12 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.HashMap;
+import java.util.WeakHashMap;
 import java.util.LinkedList;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.AddCallback;
 import org.apache.log4j.Logger;
 
@@ -53,6 +55,8 @@ public class Bookie extends Thread {
 
     final File ledgerDirectories[];
     
+    WeakHashMap<Long, ByteBuffer> masterKeys = new WeakHashMap<Long, ByteBuffer>();
+    
     public static class NoLedgerException extends IOException {
         private static final long serialVersionUID = 1L;
         private long ledgerId;
@@ -93,6 +97,20 @@ public class Bookie extends Thread {
         }
     }
 
+    private LedgerDescriptor getHandle(long ledgerId, boolean readonly, byte[] masterKey) throws IOException {
+        LedgerDescriptor handle = null;
+        synchronized (ledgers) {
+            handle = ledgers.get(ledgerId);
+            if (handle == null) {
+                handle = createHandle(ledgerId, readonly);
+                ledgers.put(ledgerId, handle);
+                masterKeys.put(ledgerId, ByteBuffer.wrap(masterKey));
+            } 
+            handle.incRef();
+        }
+        return handle;
+    }
+    
     private LedgerDescriptor getHandle(long ledgerId, boolean readonly) throws IOException {
         LedgerDescriptor handle = null;
         synchronized (ledgers) {
@@ -100,11 +118,12 @@ public class Bookie extends Thread {
             if (handle == null) {
                 handle = createHandle(ledgerId, readonly);
                 ledgers.put(ledgerId, handle);
-            }
+            } 
             handle.incRef();
         }
         return handle;
     }
+    
 
     private LedgerDescriptor createHandle(long ledgerId, boolean readOnly) throws IOException {
         RandomAccessFile ledgerFile = null;
@@ -266,10 +285,15 @@ public class Bookie extends Thread {
         }
     }
     
-    public void addEntry(ByteBuffer entry, AddCallback cb, Object ctx)
-            throws IOException {
+    public void addEntry(ByteBuffer entry, AddCallback cb, Object ctx, byte[] masterKey)
+            throws IOException, BookieException {
+        
         long ledgerId = entry.getLong();
-        LedgerDescriptor handle = getHandle(ledgerId, false);
+        LedgerDescriptor handle = getHandle(ledgerId, false, masterKey);
+        
+        if(!masterKeys.get(ledgerId).equals(ByteBuffer.wrap(masterKey))){
+            throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+        }
         try {
             entry.rewind();
             long entryId = handle.addEntry(entry);
@@ -323,7 +347,7 @@ public class Bookie extends Thread {
      * @throws InterruptedException
      */
     public static void main(String[] args) throws IOException,
-            InterruptedException {
+            InterruptedException, BookieException {
         Bookie b = new Bookie(new File("/tmp"), new File[] { new File("/tmp") });
         CounterCallback cb = new CounterCallback();
         long start = System.currentTimeMillis();
@@ -334,7 +358,7 @@ public class Bookie extends Thread {
             buff.limit(1024);
             buff.position(0);
             cb.incCount();
-            b.addEntry(buff, cb, null);
+            b.addEntry(buff, cb, null, new byte[0]);
         }
         cb.waitZero();
         long end = System.currentTimeMillis();

+ 81 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BookieException.java

@@ -0,0 +1,81 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+ 
+ 
+ import java.lang.Exception;
+ 
+ @SuppressWarnings("serial")
+public abstract class BookieException extends Exception {
+
+    private int code;
+    public BookieException(int code){
+        this.code = code;
+    }
+    
+    public static BookieException create(int code){
+        switch(code){
+        case Code.UnauthorizedAccessException:
+            return new BookieUnauthorizedAccessException();
+        default:
+            return new BookieIllegalOpException();
+        }
+    }
+    
+    public interface Code {
+        int OK = 0;
+        int UnauthorizedAccessException = -1;
+        
+        int IllegalOpException = -100;
+    }
+    
+    public void setCode(int code){
+        this.code = code;
+    }
+    
+    public int getCode(){
+        return this.code;
+    }
+    
+    public String getMessage(int code){
+        switch(code){
+        case Code.OK:
+            return "No problem";
+        case Code.UnauthorizedAccessException:
+            return "Error while reading ledger";
+        default:
+            return "Invalid operation";
+        }
+    }
+    
+    public static class BookieUnauthorizedAccessException extends BookieException {
+        public BookieUnauthorizedAccessException(){
+            super(Code.UnauthorizedAccessException);
+        }   
+    }
+    
+    public static class BookieIllegalOpException extends BookieException {
+        public BookieIllegalOpException(){
+            super(Code.UnauthorizedAccessException);
+        }   
+    }
+}

+ 1 - 1
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java

@@ -76,7 +76,7 @@ public class LedgerDescriptor {
          */
         offsetBuffer.rewind();
         offsetBuffer.putLong(ledger.position());
-        LOG.debug("Offset: " + ledger.position() + ", " + entry.position() + ", " + calcEntryOffset(entryId) + ", " + entryId);
+        //LOG.debug("Offset: " + ledger.position() + ", " + entry.position() + ", " + calcEntryOffset(entryId) + ", " + entryId);
         offsetBuffer.flip();
         
         /*

+ 19 - 28
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java

@@ -278,9 +278,12 @@ implements ReadCallback, AddCallback, Watcher {
         	    String bookie = list.remove(index);
         	    LOG.info("Bookie: " + bookie);
         	    InetSocketAddress tAddr = parseAddr(bookie);
-        	    lh.addBookie(tAddr);         	
+        	    int bindex = lh.addBookie(tAddr); 
+        	    ByteBuffer bindexBuf = ByteBuffer.allocate(4);
+        	    bindexBuf.putInt(bindex);
+        	    
         	    String pBookie = "/" + bookie;
-        	    zk.create(prefix + getZKStringId(lId) + ensemble + pBookie, new byte[0], 
+        	    zk.create(prefix + getZKStringId(lId) + ensemble + pBookie, bindexBuf.array(), 
         	            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         	} catch (IOException e) {
         	    LOG.error(e);
@@ -382,12 +385,18 @@ implements ReadCallback, AddCallback, Watcher {
         
         List<String> list = 
             zk.getChildren(prefix + getZKStringId(lId) + ensemble, false);
-                
-        for(String s : list){
-            try{
-                lh.addBookie(parseAddr(s));
-            } catch (IOException e){
-                LOG.error(e);
+        
+        for(int i = 0 ; i < list.size() ; i++){
+            for(String s : list){
+                byte[] bindex = zk.getData(prefix + getZKStringId(lId) + ensemble + "/" + s, false, stat);
+                ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
+                if(bindexBuf.getInt() == i){                      
+                    try{
+                        lh.addBookie(parseAddr(s));
+                    } catch (IOException e){
+                        LOG.error(e);
+                    }
+                }
             }
         }
       
@@ -451,31 +460,13 @@ implements ReadCallback, AddCallback, Watcher {
      */
     public void asyncAddEntry(LedgerHandle lh, byte[] data, AddCallback cb, Object ctx)
     throws InterruptedException {
-        LOG.debug("Adding entry asynchronously: " + data);
-        //lh.incLast();
+       
         if(lh != null){
             AddOp r = new AddOp(lh, data, cb, ctx);
             engines.get(lh.getId()).sendOp(r);
         }
-        //qeMap.get(lh.getId()).put(r);
     }
     
-    /**
-     * Add entry asynchronously to an open ledger.
-     */
-    //public  void asyncAddEntryVerifiable(LedgerHandle lh, byte[] data, AddCallback cb, Object ctx)
-    //throws InterruptedException, IOException, BKException, NoSuchAlgorithmException {
-    //    if(md == null)
-    //        throw BKException.create(Code.DigestNotInitializedException);
-    //        
-    //    LOG.info("Data size: " + data.length);
-    //    AddOp r = new AddOp(lh, data, cb, ctx);
-    //    //r.addDigest();
-    //    LOG.info("Data length: " + r.data.length);
-    //    engines.get(lh.getId()).sendOp(r);
-    //    //qeMap.get(lh.getId()).put(r);
-    //}
-    
     
     /**
      * Read a sequence of entries synchronously.
@@ -496,7 +487,7 @@ implements ReadCallback, AddCallback, Watcher {
         
         Operation r = new ReadOp(lh, firstEntry, lastEntry, this, counter);
         engines.get(lh.getId()).sendOp(r);
-        //qeMap.get(lh.getId()).put(r);
+        
         LOG.debug("Going to wait for read entries: " + counter.i);
         counter.block(0);
         LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);

+ 44 - 51
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java

@@ -27,7 +27,10 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.security.NoSuchAlgorithmException;
+import java.security.InvalidKeyException;
 import java.security.MessageDigest;
+import javax.crypto.Mac; 
+import javax.crypto.spec.SecretKeySpec;
 
 import org.apache.bookkeeper.client.LedgerHandle.QMode;
 import org.apache.bookkeeper.client.QuorumEngine.Operation;
@@ -84,9 +87,11 @@ public class BookieHandle extends Thread{
         this.addr = addr;
         this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000);
         
+        //genSecurePadding();
         start();
     }
     
+    
     /**
      * Restart BookieClient if can't talk to bookie
      * 
@@ -112,11 +117,6 @@ public class BookieHandle extends Thread{
         } catch(InterruptedException e){
             e.printStackTrace();
         }
-        //client.addEntry(self.getId(), 
-        //        r.entry, 
-        //        ByteBuffer.wrap(r.data), 
-        //        cb,
-        //        ctx);
     }
     
     /**
@@ -124,6 +124,7 @@ public class BookieHandle extends Thread{
      * 
      */
     MessageDigest digest = null;
+    Mac mac = null;
     
     /** 
      * Get digest instance if there is none.
@@ -138,40 +139,14 @@ public class BookieHandle extends Thread{
         return digest;
     }
     
-    /**
-     * Computes the digest for a given ByteBuffer.
-     * 
-     */
-    
-    public ByteBuffer addDigest(long entryId, ByteBuffer data)
-    throws NoSuchAlgorithmException, IOException {
-        if(digest == null)
-            getDigestInstance(self.getDigestAlg());
-        
-        ByteBuffer bb = ByteBuffer.allocate(8 + 8);
-        bb.putLong(self.getId());
-        bb.putLong(entryId);
-        
-        byte[] msgDigest;
-        
-        // synchronized(LedgerHandle.digest){
-        digest.update(self.getPasswdHash());
-        digest.update(bb.array());
-        digest.update(data.array());
-            
-        //baos.write(data);
-        //baos.write(Operation.digest.digest());
-        msgDigest = digest.digest();
-        //}
-        ByteBuffer extendedData = ByteBuffer.allocate(data.capacity() + msgDigest.length);
-        data.rewind();
-        extendedData.put(data);
-        extendedData.put(msgDigest);
-        
-        //LOG.debug("Data length (" + self.getId() + ", " + entryId + "): " + data.capacity());
-        //LOG.debug("Digest: " + new String(msgDigest));
+    Mac getMac(String alg)
+    throws NoSuchAlgorithmException, InvalidKeyException {
+        if(mac == null){
+            mac = Mac.getInstance(alg);
+            mac.init(new SecretKeySpec(self.getMacKey(), "HmacSHA1"));
+        }
         
-        return extendedData;
+        return mac;
     }
     
     /**
@@ -207,22 +182,38 @@ public class BookieHandle extends Thread{
                          * TODO: Really add the confirmed add to the op
                          */
                         long confirmed = self.getAddConfirmed();
-                        //LOG.info("Confirmed: " + confirmed);
-                        ByteBuffer extendedData = ByteBuffer.allocate(op.data.length + 8);
-                        extendedData.putLong(confirmed);
-                        extendedData.put(op.data);
-                        extendedData.rewind();
-                        
+                        ByteBuffer extendedData;
+    
                         if(self.getQMode() == QMode.VERIFIABLE){
-                            extendedData = addDigest(ts.entry, extendedData);
+                            extendedData = ByteBuffer.allocate(op.data.length + 28 + 16);
+                            extendedData.putLong(self.getId());
+                            extendedData.putLong(ts.entry);
+                            extendedData.putLong(confirmed);
+                            extendedData.put(op.data);
+                        
+                        
+                            extendedData.rewind();
+                            byte[] toProcess = new byte[op.data.length + 24];
+                            extendedData.get(toProcess, 0, op.data.length + 24);
+                            //extendedData.limit(extendedData.capacity() - 20);
+                            extendedData.position(extendedData.capacity() - 20);
+                            if(mac == null)
+                                getMac("HmacSHA1");
+                            extendedData.put(mac.doFinal(toProcess));
+                            extendedData.position(16);
+                        } else {
+                            extendedData = ByteBuffer.allocate(op.data.length + 8);
+                            extendedData.putLong(confirmed);
+                            extendedData.put(op.data);
+                            extendedData.flip();
                         }
                         
-                        //LOG.debug("Extended data: " + extendedData.capacity());
-                        client.addEntry(self.getId(), 
-                            ts.entry, 
-                            extendedData, 
-                            aOp.wcb,
-                            ts.ctx);
+                        client.addEntry(self.getId(),
+                                self.getLedgerKey(),
+                                ts.entry, 
+                                extendedData, 
+                                aOp.wcb,
+                                ts.ctx);
                         break;
                     case Operation.READ:
                         client.readEntry(self.getId(),
@@ -238,6 +229,8 @@ public class BookieHandle extends Thread{
                 LOG.error(e);
             } catch (NoSuchAlgorithmException e){
                 LOG.error(e);
+            } catch (InvalidKeyException e) {
+                LOG.error(e);
             }
         }
     }

+ 23 - 4
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java

@@ -54,28 +54,48 @@ class ClientCBWorker extends Thread{
         return instance;
     }
     
+    /**
+     * Constructor initiates queue of pending operations and
+     * runs thread.
+     * 
+     */
     ClientCBWorker(){
        pendingOps = new ArrayBlockingQueue<Operation>(4000);  
        start();
        LOG.debug("Have started cbWorker");
     }
     
+    
+    /**
+     * Adds operation to queue of pending.
+     * 
+     * @param   op  operation to add to queue
+     */
+    
     void addOperation(Operation op) 
     throws InterruptedException {
         pendingOps.put(op);
-        LOG.debug("Added operation to queue of pending");
     }
     
+    /**
+     * Gets thread out of its main loop.
+     * 
+     */
     synchronized void shutdown(){
         stop = true;
         instance = null;
         LOG.debug("Shutting down");
     }
     
+    
+    /**
+     * Main thread loop.
+     * 
+     */
+    
     public void run(){
         try{
             while(!stop){
-                LOG.debug("Going to sleep on queue");
                 Operation op = pendingOps.poll(1000, TimeUnit.MILLISECONDS);
                 if(op != null){
                     synchronized(op){
@@ -83,7 +103,6 @@ class ClientCBWorker extends Thread{
                             op.wait();
                         }
                     }
-                    LOG.debug("Request ready");
 
                     switch(op.type){
                     case Operation.ADD:
@@ -96,7 +115,7 @@ class ClientCBWorker extends Thread{
                         break;
                     case Operation.READ:
                         ReadOp rOp = (ReadOp) op;
-                        LOG.debug("Got one message from the queue: " + rOp.firstEntry);
+                        //LOG.debug("Got one message from the queue: " + rOp.firstEntry);
                         rOp.cb.readComplete(rOp.getErrorCode(), 
                             rOp.getLedger().getId(), 
                             new LedgerSequence(rOp.seq), 

+ 0 - 5
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java

@@ -31,7 +31,6 @@ import org.apache.log4j.Logger;
  * 
  */
 
-
 public class LedgerEntry {
     Logger LOG = Logger.getLogger(LedgerEntry.class);
     
@@ -43,10 +42,6 @@ public class LedgerEntry {
         this.lId = lId;
         this.eId = eId;
         this.entry = entry;
-        if(entry != null)
-            LOG.debug("Entry: " + entry.length + " , " + new String(entry));
-        else
-            LOG.debug("Entry is null");
     }
     
     public long getLedgerId(){

+ 56 - 13
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java

@@ -62,7 +62,8 @@ public class LedgerHandle {
     private int threshold;
     private String digestAlg = "SHA1";
     
-    private byte[] passwdHash;
+    private byte[] macKey;
+    private byte[] ledgerKey;
     private byte[] passwd;
     
     LedgerHandle(BookKeeper bk, 
@@ -74,7 +75,8 @@ public class LedgerHandle {
         this.last = last;
         this.bookies = new ArrayList<BookieHandle>();
         this.passwd = passwd;
-        genPasswdHash(passwd);
+        genLedgerKey(passwd);
+        genMacKey(passwd);
 
         this.qSize = (bookies.size() + 1)/2;
     }
@@ -93,7 +95,8 @@ public class LedgerHandle {
         this.qSize = qSize;
         this.qMode = mode;
         this.passwd = passwd;
-        genPasswdHash(passwd);
+        genLedgerKey(passwd);
+        genMacKey(passwd);
     }
         
         
@@ -109,7 +112,8 @@ public class LedgerHandle {
 
         this.qSize = qSize;
         this.passwd = passwd;
-        genPasswdHash(passwd);
+        genLedgerKey(passwd);
+        genMacKey(passwd);
     }
     
     private void setBookies(ArrayList<InetSocketAddress> bookies)
@@ -135,17 +139,20 @@ public class LedgerHandle {
     }
     
     
+    
     /**
      * Create bookie handle and add it to the list
      * 
      * @param addr	socket address
      */
-    void addBookie(InetSocketAddress addr)
+    int addBookie(InetSocketAddress addr)
     throws IOException {
         BookieHandle bh = new BookieHandle(this, addr);
         this.bookies.add(bh);
         
         if(bookies.size() > qSize) setThreshold();
+        
+        return (this.bookies.size() - 1);
     }
     
     private void setThreshold(){
@@ -311,23 +318,50 @@ public class LedgerHandle {
     }
     
     /**
-     * Generates and stores password hash.
+     * Generates and stores Ledger key.
      * 
      * @param passwd
      */
     
-    private void genPasswdHash(byte[] passwd){
+    private void genLedgerKey(byte[] passwd){
         try{
-            MessageDigest digest = MessageDigest.getInstance("MD5");
+            MessageDigest digest = MessageDigest.getInstance("SHA");
+            String pad = "ledger";
+            
+            byte[] toProcess = new byte[passwd.length + pad.length()];
+            System.arraycopy(pad.getBytes(), 0, toProcess, 0, pad.length());
+            System.arraycopy(passwd, 0, toProcess, pad.length(), passwd.length);
         
-            digest.update(passwd);
-            this.passwdHash = digest.digest();
+            digest.update(toProcess);
+            this.ledgerKey = digest.digest();
         } catch(NoSuchAlgorithmException e){
             this.passwd = passwd;
             LOG.error("Storing password as plain text because secure hash implementation does not exist");
         }
     }
     
+    /**
+     * Generates and stores Mac key.
+     * 
+     * @param passwd
+     */
+    
+    private void genMacKey(byte[] passwd){
+        try{
+            MessageDigest digest = MessageDigest.getInstance("SHA");
+            String pad = "mac";
+            
+            byte[] toProcess = new byte[passwd.length + pad.length()];
+            System.arraycopy(pad.getBytes(), 0, toProcess, 0, pad.length());
+            System.arraycopy(passwd, 0, toProcess, pad.length(), passwd.length);
+        
+            digest.update(toProcess);
+            this.macKey = digest.digest();
+        } catch(NoSuchAlgorithmException e){
+            this.passwd = passwd;
+            LOG.error("Storing password as plain text because secure hash implementation does not exist");
+        }
+    }
     
     /**
      * Returns password in plain text
@@ -338,12 +372,21 @@ public class LedgerHandle {
     
     
     /**
-     * Returns password hash
+     * Returns MAC key
      * 
      * @return byte[]
      */
-    byte[] getPasswdHash(){
-       return passwdHash; 
+    byte[] getMacKey(){
+       return macKey; 
     }
    
+    /**
+     * Returns Ledger key
+     * 
+     * @return byte[]
+     */
+    byte[] getLedgerKey(){
+       return ledgerKey; 
+    }
+    
 }

+ 30 - 3
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java

@@ -64,6 +64,15 @@ class LedgerRecoveryMonitor implements ReadEntryCallback{
     
     private int minimum;
     
+    /**
+     * Constructor simply initiates data structures
+     * 
+     * @param self  Instance of BookKeeper
+     * @param lId   Ledger identifier
+     * @param qSize Quorum size
+     * @param bookies   List of bookie addresses
+     * @param qMode     Quorum mode
+     */
     LedgerRecoveryMonitor(BookKeeper self,
             long lId, 
             int qSize, 
@@ -88,6 +97,13 @@ class LedgerRecoveryMonitor implements ReadEntryCallback{
         
     }
     
+    
+    /**
+     * Determines the last entry written to a ledger not closed properly
+     * due to a client crash
+     * 
+     * @param   passwd  
+     */
     boolean recover(byte[] passwd) throws 
     IOException, InterruptedException, BKException, KeeperException {
         /*
@@ -139,9 +155,11 @@ class LedgerRecoveryMonitor implements ReadEntryCallback{
                     //if(ls == null) throw BKException.create(Code.ReadException);
                     LOG.debug("Received entry for: " + lh.getLast());
                     
-                    if(ls.nextElement().getEntry() != null){
+                    byte[] le = ls.nextElement().getEntry();
+                    if(le != null){
                         if(notLegitimate) notLegitimate = false;
-                        lh.incLast();
+                        self.addEntry(lh, le);
+                        //lh.incLast();
                         hasMore = true;
                     }
                 }
@@ -165,7 +183,16 @@ class LedgerRecoveryMonitor implements ReadEntryCallback{
                 
     }
     
-    
+    /**
+     * Read callback implementation
+     * 
+     * @param rc    return code
+     * @param ledgerId  Ledger identifier
+     * @param entryId   Entry identifier
+     * @param bb        Data
+     * @param ctx       Control object
+     * 
+     */
     public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx){
         if(rc == 0){
             bb.rewind();

+ 71 - 63
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java

@@ -49,7 +49,6 @@ import org.apache.log4j.Logger;
 public class QuorumEngine {
     Logger LOG = Logger.getLogger(QuorumEngine.class);
 
-    //ArrayBlockingQueue<Operation> incomingQueue;
     QuorumOpMonitor opMonitor;
     ClientCBWorker cbWorker;
     LedgerHandle lh;
@@ -57,8 +56,9 @@ public class QuorumEngine {
     boolean stop = false;
     
     /**
-     * Requests generated by BookKeeper.java upon client calls.
-     * There are three types of requests: READ, ADD, STOP.
+     * Operation descriptor: Requests generated by BookKeeper.java
+     * upon client calls. There are three types of requests: READ, 
+     * ADD, STOP.
      */
     
     public static class Operation {
@@ -192,74 +192,82 @@ public class QuorumEngine {
         this.cbWorker = ClientCBWorker.getInstance();
         LOG.debug("Created cbWorker");
     }
-    
+  
+    /**
+     * Sends requests to BookieHandle instances. Methods in BookKeeper call
+     * this method to submit both add and read requests.
+     * 
+     * @param r Operation descriptor
+     */
     void sendOp(Operation r)
     throws InterruptedException {
-                switch(r.type){
-                case Operation.READ:
-                    Operation.ReadOp rOp = (Operation.ReadOp) r;
-                    LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
-                    cbWorker.addOperation(r);
-                    
-                    for(long entry = rOp.firstEntry; 
-                        entry <= rOp.lastEntry;
-                        entry++){
-                        
-                        //Send requests to bookies
-                        for(BookieHandle bh : lh.getBookies()){
-                            try{
-                                SubOp.SubReadOp sRead = new SubOp.SubReadOp(rOp, 
-                                        new PendingReadOp(lh), 
-                                        lh.getBookies().indexOf(bh),
-                                        opMonitor);
-                                bh.sendRead(sRead, entry);
-                        
-                            } catch(IOException e){
-                                LOG.error(e);
-                            }
-                        }  
+        
+        int n = lh.getBookies().size();
+        switch(r.type){
+        case Operation.READ:
+            Operation.ReadOp rOp = (Operation.ReadOp) r;
+            
+            LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
+            cbWorker.addOperation(r);
+            
+            for(long entry = rOp.firstEntry; 
+            entry <= rOp.lastEntry;
+            entry++){
+                long counter = 0;
+                PendingReadOp pROp = new PendingReadOp(lh);
+                
+                //Send requests to bookies
+                while(counter < lh.getQuorumSize()){
+                    int index = (int)((entry + counter++) % n);
+                    try{
+                        SubOp.SubReadOp sRead = new SubOp.SubReadOp(rOp, 
+                                pROp, 
+                                index,
+                                opMonitor);
+                        lh.getBookies().get((index) % n).sendRead(sRead, entry);
+                                    
+                    } catch(IOException e){
+                        LOG.error(e);
                     }
-                    //r.cb.processResult();
-                    break;
-                case Operation.ADD:
-                    long counter = 0;
-                    //ArrayList<BookieHandle> list = lh.getBookies();
-                    int n = lh.getBookies().size();
-                    
-                    LOG.debug("Adding add operation to opMonitor");
-                    //opMonitor.addOp(lh, (Operation.AddOp) r);
-                    cbWorker.addOperation(r);
-                    Operation.AddOp aOp = (Operation.AddOp) r;
-                    PendingOp pOp = new PendingOp();
-                    while(counter < lh.getQuorumSize()  ){
-                        int index = (int)((aOp.entry + counter++) % n);
-                    	
-                        try{
-                            SubOp.SubAddOp sAdd = new 
-                                SubOp.SubAddOp(aOp, 
-                                    pOp, 
-                                    index,
-                                    opMonitor);
-                            lh.getBookies().get((index) % n).sendAdd(sAdd, aOp.entry);
-                        } catch (IOException io) {
-                            LOG.error(io);
-                            try{
-                                /*
-                                 * Before getting a new bookie, try to reconnect
-                                 */
-                                lh.getBookies().get((index) % n).restart();
-                            } catch (IOException nio){
-                                    lh.removeBookie(index);
-                            }
-                        }
+                }  
+            }
+  
+            break;
+        case Operation.ADD:
+            long counter = 0;
+            
+            cbWorker.addOperation(r);
+            Operation.AddOp aOp = (Operation.AddOp) r;
+            PendingOp pOp = new PendingOp();
+            while(counter < lh.getQuorumSize()  ){
+                int index = (int)((aOp.entry + counter++) % n);
+                
+                try{
+                    SubOp.SubAddOp sAdd = new 
+                    SubOp.SubAddOp(aOp, 
+                            pOp, 
+                            index,
+                            opMonitor);
+                    lh.getBookies().get((index) % n).sendAdd(sAdd, aOp.entry);
+                } catch (IOException io) {
+                    LOG.error(io);
+                    try{
+                        /*
+                         * Before getting a new bookie, try to reconnect
+                         */
+                        lh.getBookies().get((index) % n).restart();
+                    } catch (IOException nio){
+                        lh.removeBookie(index);
                     }
-                    //qRef = (qRef + 1) % n;
-                    break;
+                }
+            }
+            //qRef = (qRef + 1) % n;
+            break;
                 case Operation.STOP:
                     stop = true;
                     cbWorker.shutdown();
                     break;
-                }
+        }
     }
     
 }

+ 112 - 135
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java

@@ -27,10 +27,14 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentHashMap;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.security.InvalidKeyException;
+import javax.crypto.Mac; 
+import javax.crypto.spec.SecretKeySpec;
 
 
 import org.apache.bookkeeper.client.BookieHandle;
@@ -44,6 +48,7 @@ import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.ReadEntryCallback;
 import org.apache.bookkeeper.proto.WriteCallback;
 import org.apache.log4j.Logger;
@@ -77,6 +82,7 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
      * 
      */
     MessageDigest digest = null;
+    int dLength;
     
     /** 
      * Get digest instance if there is none.
@@ -101,36 +107,9 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
             this.bookieIdSent = new HashSet<Integer>();
             this.bookieIdRecv = new HashSet<Integer>();
         }
-
-        //PendingOp(Operation op){
-        //    this.op = op;
-        //    bookieIdSent = new HashSet<Integer>();
-        //    bookieIdRecv = new HashSet<Integer>();
-        //}
-        
-        //void setOp(Operation op){
-        //    this.op = op;
-        //}
-        
-        //Operation getOp(){
-        //    return this.op;
-        //}
         
     };
     
-    /**
-     * Objects of this type are used to keep track of the status of
-     * a given write request.
-     * 
-     *
-     */
-    //public static class PendingAddOp extends PendingOp{
-    //    AddOp op;  
-        
-    //    PendingAddOp(LedgerHandle lh, AddOp op){
-    //        this.op = op;
-    //    }
-    //}
     
     /**
      * Objects of this type are used to keep track of the status of
@@ -142,30 +121,27 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
         /*
          * Values for ongoing reads
          */
-        ConcurrentHashMap<Long, ArrayList<ByteBuffer>> proposedValues;
-        
-        /*
-         * Bookies from which received a response
-         */
-        //ConcurrentHashMap<Long, HashSet<Integer>> received;
-        
-        
+
+        ArrayList<ByteBuffer> proposedValues;
+                
         PendingReadOp(LedgerHandle lh){
-            this.proposedValues = 
-                new ConcurrentHashMap<Long, ArrayList<ByteBuffer>>();
-            //this.received = 
-            //    new ConcurrentHashMap<Long, HashSet<Integer>>();
+            this.proposedValues =
+                new ArrayList<ByteBuffer>();
         }    
       
     }
     
     QuorumOpMonitor(LedgerHandle lh){
         this.lh = lh;
+        try{
+            this.dLength = getDigestInstance(lh.getDigestAlg()).getDigestLength();
+        } catch(NoSuchAlgorithmException e){
+            LOG.error("Problem with message digest: " + e);
+            this.dLength = 0;
+        }
     }
     
-    
-    
-    
+   
     /**
      * Callback method for write operations. There is one callback for
      * each write to a server.
@@ -258,89 +234,96 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
          * Collect responses, and reply when there are sufficient 
          * answers.
          */
-        LOG.debug("New response: " + rc);
+        
         if(rc == 0){
             SubReadOp sRead = (SubReadOp) ctx;
             ReadOp rOp = (ReadOp) sRead.op;
             PendingReadOp pOp = sRead.pOp;
             if(pOp != null){
                 HashSet<Integer> received = pOp.bookieIdRecv;
-                //if(!received.containsKey(entryId)){
-                //    received.put(entryId, new HashSet<Integer>());
-                //}
+                
                 boolean result = received.add(sRead.bIndex);
                 int counter = -1;
                 if(result){
-
-                    if(!pOp.proposedValues.containsKey(entryId)){
-                        pOp.proposedValues.put(entryId, new ArrayList<ByteBuffer>());
-                    }
-                    ArrayList<ByteBuffer> list = pOp.proposedValues.get(entryId);
-                    list.add(bb);
-        
+                    
+                    ByteBuffer voted = null;
+                    ArrayList<ByteBuffer> list;
                     switch(lh.getQMode()){
-                        case VERIFIABLE:
-                            if(list.size() >= 1){
-                                try{
-                                    ByteBuffer voted = voteVerifiable(list);
-                                    if(voted != null){
-                                        LOG.debug("Voted: " + new String(voted.array()));
-                                    
-                                        MessageDigest md = getDigestInstance(lh.getDigestAlg());
-                                        int dlength = md.getDigestLength();
-                                        if(voted.capacity() - dlength > 0){
-                                            byte[] data = new byte[voted.capacity() - dlength - 24];
-                                            LOG.info("Digest length: " + dlength + ", " + data.length);
-                                            voted.position(24);
-                                            voted.get(data, 0, data.length);
-                                            counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
-                                        } else {
-                                            LOG.error("Short message: " + voted.capacity());
-                                        }
-                                    }
-                                } catch(NoSuchAlgorithmException e){
-                                    LOG.error("Problem with message digest: " + e);
-                                } catch(BKException bke) {
-                                    LOG.error(bke.toString() + "( " + ledgerId + ", " + entryId + ", " + pOp.bookieIdRecv + ")");
-                                    countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
-                                }
+                    case VERIFIABLE:
+                        if(rOp.seq[(int) (entryId % (rOp.lastEntry - rOp.firstEntry + 1))] == null)
+                           try{
+                                voted = voteVerifiable(bb);
+                            } catch(NoSuchAlgorithmException e){
+                                LOG.error("Problem with message digest: " + e);
+                            } catch(BKException bke) {
+                                LOG.error(bke.toString() + "( " + ledgerId + ", " + entryId + ", " + pOp.bookieIdRecv + ")");
+                                countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
+                            } catch(InvalidKeyException e){
+                                LOG.error(e);
                             }
-                            break;
-                        case GENERIC:
-                            if(list.size() >= ((lh.getQuorumSize() + 1)/2)){
-                                ByteBuffer voted = voteGeneric(list, (lh.getQuorumSize() + 1)/2);
-                                if(voted != null){
-                                    LOG.debug("Voted: " + voted.array());
-                                    byte[] data = new byte[voted.capacity() - 24];
-                                    voted.position(24);
+ 
+                            if(voted != null) { 
+                                if(voted.capacity() - dLength > 0){
+                                    byte[] data = new byte[voted.capacity() - dLength - 24];
+                                    voted.position(24);                                    
                                     voted.get(data, 0, data.length);
                                     counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
-                                }
+                                } 
                             }
-                            break;
-                        case FREEFORM:
-                        	if(list.size() == lh.getQuorumSize()){
-                        		ByteBuffer voted = voteFree(list);
-                                if(voted != null){
-                                    LOG.debug("Voted: " + voted.array());
-                                    byte[] data = new byte[voted.capacity() - 24];
-                                    voted.position(24);
-                                    voted.get(data, 0, data.length);
-                                    counter = addNewEntry(new LedgerEntry(ledgerId, entryId, voted.array()), rOp);
+                               
+                        break;
+                    case GENERIC:
+                        list = pOp.proposedValues;
+                        LOG.debug("List length before: " + list.size());
+                        
+                        synchronized(list){
+                            if(rOp.seq[(int) (entryId % (rOp.lastEntry - rOp.firstEntry + 1))] == null){
+                                list.add(bb);
+                                bb.position(24);
+                                if(list.size() >= ((lh.getQuorumSize() + 1)/2)){
+                                    voted = voteGeneric(list, (lh.getQuorumSize() + 1)/2);
                                 }
-                        	}
-                    }
-                }    
+                            }
+                        }
+                        
+                                    
+                        if(voted != null){
+                            LOG.debug("Voted: " + voted.array());
+                            byte[] data = new byte[voted.capacity() - 24];
+                            voted.position(24);
+                            voted.get(data, 0, data.length);
+                            counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
+                        }
+                                
+                                
+                        break;
+                    case FREEFORM:
+                        list = pOp.proposedValues;
+                        LOG.debug("List length before: " + list.size());
+                        synchronized(list){
+                            if(list.size() == lh.getQuorumSize()){
+                                voted = voteFree(list);
+                            }
+                        }
+                        
+                        if(voted != null){
+                            LOG.debug("Voted: " + voted.array());
+                            byte[] data = new byte[voted.capacity() - 24];
+                            voted.position(24);
+                            voted.get(data, 0, data.length);
+                            counter = addNewEntry(new LedgerEntry(ledgerId, entryId, voted.array()), rOp);
+                        }                      
+                    }   
         
-                if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) && 
-                        !sRead.op.isReady()){
-                    
-                    sRead.op.setReady();
-                    //sRead.op.cb.readComplete(0, ledgerId, new LedgerSequence(sRead.op.seq), sRead.op.ctx);
-                    //sRead.op.complete = true;
-                }
+                    if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) && 
+                            !sRead.op.isReady()){
+                        
+                        sRead.op.setReady();
+                    }
             
-                LOG.debug("Counter: " + rOp.counter);
+                    long diff = rOp.lastEntry - rOp.firstEntry;
+                    //LOG.debug("Counter: " + rOp.counter + ", " + diff);
+                }
             }
         } else {
             /*
@@ -385,43 +368,40 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
      * @return
      */
     
-    private ByteBuffer voteVerifiable(ArrayList<ByteBuffer> list) 
-    throws NoSuchAlgorithmException, BKException{
+    
+    private ByteBuffer voteVerifiable(ByteBuffer bb) 
+    throws NoSuchAlgorithmException, InvalidKeyException, BKException{
         /*
          * Check if checksum matches
          */
-        ByteBuffer bb = list.get(0);
-        list.remove(0);
         
-        MessageDigest md = getDigestInstance(lh.getDigestAlg());
-        int dlength = md.getDigestLength();
+        Mac mac = ((BookieClient) Thread.currentThread()).getMac("HmacSHA1", lh.getMacKey());
+        int dlength = mac.getMacLength();
        
-        /*
-         * TODO: The if check below is legitimate, but in reality it should never happen,
-         * bt it showed up a few times in experiments. Have to check why it is happening.
-         */
         if(bb.capacity() <= dlength){
-        	LOG.warn("Something wrong with this entry, length smaller than digest length");
-        	return null;
+            LOG.warn("Something wrong with this entry, length smaller than digest length");
+            return null;
         }
         
         byte[] data = new byte[bb.capacity() - dlength];
         bb.get(data, 0, bb.capacity() - dlength);
         
+        
         byte[] sig = new byte[dlength];
         bb.position(bb.capacity() - dlength);
         bb.get(sig, 0, dlength);
-        
+
         bb.rewind();
         
-        //LOG.warn("Data: " + data.toString() + ", Signature: " + sig.toString());
-        md.update(lh.getPasswdHash());
-        md.update(data);
-        if(MessageDigest.isEqual(md.digest(), sig)){
+        byte[] msgDigest = mac.doFinal(data);
+        if(Arrays.equals(mac.doFinal(data), sig)){
+            
             return bb;
         } else {
+            LOG.error("Entry id: " + new String(msgDigest) + new String(sig));
             throw BKException.create(Code.DigestMatchException);
         }
+        
     }
     
     /**
@@ -434,15 +414,16 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
         
     private ByteBuffer voteGeneric(ArrayList<ByteBuffer> list, int threshold){  
         HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
-        for(ByteBuffer bb : list){
+        for(ByteBuffer bb : list){  
             if(!map.containsKey(bb)){
-                map.put(bb, Integer.valueOf(0));
-            }
+                map.put(bb, new Integer(0));
+            } else LOG.debug("Not equal");
             
-            map.put(bb, map.get(bb) + 1);
+            if(bb != null)
+                map.put(bb, map.get(bb) + 1);
             
             if(map.get(bb) >= threshold)
-                return bb;
+                return bb;  
         }
         
         return null;   
@@ -459,6 +440,7 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
     private ByteBuffer voteFree(ArrayList<ByteBuffer> list){  
         HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
         for(ByteBuffer bb : list){
+            bb.position(24);
             if(!map.containsKey(bb)){
                 map.put(bb, Integer.valueOf(0));
             }
@@ -483,11 +465,6 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
         if(op.seq[(int) index] == null){
             op.seq[(int) index] = le;
             
-            if(le.getEntry() != null)
-                LOG.debug("Adding entry: " + le.getEntryId() + ", " + le.getEntry().length);
-            else
-                LOG.debug("Entry is null: " + le.getEntryId());
-            
             return op.counter.incrementAndGet();
         }
         

+ 71 - 31
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java

@@ -30,7 +30,12 @@ import java.nio.channels.SocketChannel;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Semaphore;
-
+import java.util.Arrays;
+import java.security.NoSuchAlgorithmException;
+import java.security.InvalidKeyException;
+import java.security.MessageDigest;
+import javax.crypto.Mac; 
+import javax.crypto.spec.SecretKeySpec;
 
 import org.apache.bookkeeper.proto.ReadEntryCallback;
 import org.apache.bookkeeper.proto.WriteCallback;
@@ -51,7 +56,7 @@ public class BookieClient extends Thread {
     throws IOException, ConnectException {
         sock = SocketChannel.open(addr);
         setDaemon(true);
-        //sock.configureBlocking(false);
+   
         sock.socket().setSoTimeout(recvTimeout);
         sock.socket().setTcpNoDelay(true);
         start();
@@ -102,8 +107,8 @@ public class BookieClient extends Thread {
     ConcurrentHashMap<CompletionKey, Completion<WriteCallback>> addCompletions = new ConcurrentHashMap<CompletionKey, Completion<WriteCallback>>();
     ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions = new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
     
-    Object writeLock = new Object();
-    Object readLock = new Object();
+    //Object writeLock = new Object();
+    //Object readLock = new Object();
     
     /*
      * Use this semaphore to control the number of completion key in both addCompletions
@@ -113,6 +118,41 @@ public class BookieClient extends Thread {
     Semaphore completionSemaphore = new Semaphore(1000);
     
    
+    /**
+     * Message disgest instance
+     * 
+     */
+    MessageDigest digest = null;
+    
+    /** 
+     * Get digest instance if there is none.
+     * 
+     */
+    public MessageDigest getDigestInstance(String alg)
+    throws NoSuchAlgorithmException {
+        if(digest == null){
+            digest = MessageDigest.getInstance(alg);
+        }
+        
+        return digest;
+    }
+    
+    /**
+     * Mac instance
+     * 
+     */
+    Mac mac = null;
+    
+    public Mac getMac(String alg, byte[] key)
+    throws NoSuchAlgorithmException, InvalidKeyException {
+        if(mac == null){
+            mac = Mac.getInstance(alg);
+            mac.init(new SecretKeySpec(key, "HmacSHA1"));
+        }
+        
+        return mac;
+    }
+    
     /**
      * Send addEntry operation to bookie.
      * 
@@ -123,7 +163,7 @@ public class BookieClient extends Thread {
      * @throws IOException
      * @throws InterruptedException
      */
-    public void addEntry(long ledgerId, long entryId,
+    synchronized public void addEntry(long ledgerId, byte[] masterKey, long entryId,
             ByteBuffer entry, WriteCallback cb, Object ctx) 
     throws IOException, InterruptedException {
         
@@ -132,12 +172,14 @@ public class BookieClient extends Thread {
         addCompletions.put(new CompletionKey(ledgerId, entryId),
                 new Completion<WriteCallback>(cb, ctx));
         //entry = entry.duplicate();
-        entry.position(0);
+        //entry.position(0);
         
-        ByteBuffer tmpEntry = ByteBuffer.allocate(entry.capacity() + 8 + 8 + 8);
+        ByteBuffer tmpEntry = ByteBuffer.allocate(entry.remaining() + 44);
 
         tmpEntry.position(4);
         tmpEntry.putInt(BookieProtocol.ADDENTRY);
+        tmpEntry.put(masterKey);
+        //LOG.debug("Master key: " + new String(masterKey));
         tmpEntry.putLong(ledgerId);
         tmpEntry.putLong(entryId);
         tmpEntry.put(entry);
@@ -147,14 +189,12 @@ public class BookieClient extends Thread {
         // 4 bytes for the message type
         tmpEntry.putInt(tmpEntry.remaining() - 4);
         tmpEntry.position(0);
-        synchronized(writeLock) {
-            //sock.write(len);
-            //len.clear();
-            //len.putInt(BookieProtocol.ADDENTRY);
-            //len.flip();
-            //sock.write(len);
-            sock.write(tmpEntry);
-        }
+        //sock.write(len);
+        //len.clear();
+        //len.putInt(BookieProtocol.ADDENTRY);
+        //len.flip();
+        //sock.write(len);
+        sock.write(tmpEntry);
         //LOG.debug("addEntry:finished");
     }
     
@@ -167,30 +207,30 @@ public class BookieClient extends Thread {
      * @param ctx		control object
      * @throws IOException
      */
-    public void readEntry(long ledgerId, long entryId,
+    synchronized public void readEntry(long ledgerId, long entryId,
             ReadEntryCallback cb, Object ctx) 
     throws IOException, InterruptedException {
     	
     	completionSemaphore.acquire();
         readCompletions.put(new CompletionKey(ledgerId, entryId),
                 new Completion<ReadEntryCallback>(cb, ctx));
-        ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8);
+        ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8 + 8);
+        tmpEntry.putInt(20);
+        tmpEntry.putInt(BookieProtocol.READENTRY);
         tmpEntry.putLong(ledgerId);
         tmpEntry.putLong(entryId);
         tmpEntry.position(0);
         
-        ByteBuffer len = ByteBuffer.allocate(4);
-        len.putInt(tmpEntry.remaining() + 4);
-        len.flip();
+        //ByteBuffer len = ByteBuffer.allocate(4);
+        //len.putInt(tmpEntry.remaining() + 4);
+        //len.flip();
         //LOG.debug("readEntry: Writing to socket");
-        synchronized(readLock) {
-            sock.write(len);
-            len.clear();
-            len.putInt(BookieProtocol.READENTRY);
-            len.flip();
-            sock.write(len);
-            sock.write(tmpEntry);
-        }
+        //sock.write(len);
+        //len.clear();
+        //len.putInt(BookieProtocol.READENTRY);
+        //len.flip();
+        //sock.write(len);
+        sock.write(tmpEntry);
         //LOG.error("Size of readCompletions: " + readCompletions.size());
     }
     
@@ -241,8 +281,8 @@ public class BookieClient extends Thread {
                     byte[] data = new byte[bb.capacity() - 24];
                     bb.get(data);
                     ByteBuffer entryData = ByteBuffer.wrap(data);
-                    
-                    LOG.info("Received entry: " + ledgerId + ", " + entryId + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());
+                    //ByteBuffer entryData = bb;
+                    //LOG.info("Received entry: " + ledgerId + ", " + entryId + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());
           
                     
                     CompletionKey key = new CompletionKey(ledgerId, entryId);
@@ -334,7 +374,7 @@ public class BookieClient extends Thread {
             entry.put(hello);
             entry.flip();
             counter.inc();
-            bc.addEntry(ledger, i, entry, cb, counter);
+            bc.addEntry(ledger, new byte[0], i, entry, cb, counter);
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());

+ 7 - 1
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java

@@ -66,5 +66,11 @@ public interface BookieProtocol {
     /**
      * General error occurred at the server
      */
-    public static final int EIO = 0;
+    public static final int EIO = 101;
+    
+    /**
+     * Unauthorized access to ledger
+     */
+    public static final int EUA = 102;
+    
 }

+ 17 - 1
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java

@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.AddCallback;
 import org.apache.bookkeeper.proto.NIOServerFactory.Cnxn;
 import org.apache.log4j.Logger;
@@ -90,10 +91,14 @@ public class BookieServer implements NIOServerFactory.PacketProcessor, AddCallba
         switch(type) {
         case BookieProtocol.ADDENTRY:
             try {
-                bookie.addEntry(packet.slice(), this, src);
+                byte[] masterKey = new byte[20];
+                packet.get(masterKey, 0, 20);
+                //LOG.debug("Master key: " + new String(masterKey));
+                bookie.addEntry(packet.slice(), this, src, masterKey);
             } catch(IOException e) {
                 if (LOG.isTraceEnabled()) {
                     ByteBuffer bb = packet.duplicate();
+    
                     long ledgerId = bb.getLong();
                     long entryId = bb.getLong();
                     LOG.trace("Error reading " + entryId + "@" + ledgerId, e);
@@ -103,6 +108,17 @@ public class BookieServer implements NIOServerFactory.PacketProcessor, AddCallba
                 eio.putInt(BookieProtocol.EIO);
                 eio.flip();
                 src.sendResponse(new ByteBuffer[] {eio});
+            } catch(BookieException e){
+                ByteBuffer bb = packet.duplicate();
+                long ledgerId = bb.getLong();
+                
+                LOG.error("Unauthorized access to ledger " + ledgerId);
+                
+                ByteBuffer eio = ByteBuffer.allocate(8);
+                eio.putInt(type);
+                eio.putInt(BookieProtocol.EUA);
+                eio.flip();
+                src.sendResponse(new ByteBuffer[] {eio});
             }
             break;
         case BookieProtocol.READENTRY:

+ 10 - 6
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java

@@ -23,6 +23,7 @@ package org.apache.bookkeeper.test;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.junit.Test;
 import org.apache.bookkeeper.bookie.Bookie;
@@ -102,21 +103,24 @@ public class BookieClientTest extends TestCase {
     @Test
     public void testWriteGaps() throws Exception {
         final Object notifyObject = new Object();
+        byte[] passwd = new byte[20];
+        Arrays.fill(passwd, (byte) 'a');
+        
         BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
         ByteBuffer bb;
         bb = createByteBuffer(1);
-        bc.addEntry(1, 1, bb, wrcb, null);
+        bc.addEntry(1, passwd, 1, bb, wrcb, null);
         bb = createByteBuffer(2);
-        bc.addEntry(1, 2, bb, wrcb, null);
+        bc.addEntry(1, passwd, 2, bb, wrcb, null);
         bb = createByteBuffer(3);
-        bc.addEntry(1, 3, bb, wrcb, null);
+        bc.addEntry(1, passwd, 3, bb, wrcb, null);
         bb = createByteBuffer(5);
-        bc.addEntry(1, 5, bb, wrcb, null);
+        bc.addEntry(1, passwd, 5, bb, wrcb, null);
         bb = createByteBuffer(7);
-        bc.addEntry(1, 7, bb, wrcb, null);
+        bc.addEntry(1, passwd, 7, bb, wrcb, null);
         synchronized(notifyObject) {
             bb = createByteBuffer(11);
-            bc.addEntry(1, 11, bb, wrcb, notifyObject);
+            bc.addEntry(1, passwd, 11, bb, wrcb, notifyObject);
             notifyObject.wait();
         }
         ResultStruct arc = new ResultStruct();

+ 5 - 0
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java

@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.io.IOException;
 import java.lang.InterruptedException;
+import java.util.Arrays;
 
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.WriteCallback;
@@ -73,7 +74,11 @@ class LoopbackClient implements WriteCallback {
     void write(long ledgerId, long entry, byte[] data, WriteCallback cb, Object ctx)
     throws IOException, InterruptedException {
         LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry);
+        byte[] passwd = new byte[20];
+        Arrays.fill(passwd, (byte) 'a');
+        
         client.addEntry(ledgerId, 
+            passwd,
             entry, 
             ByteBuffer.wrap(data), 
             cb,