Forráskód Böngészése

ZOOKEEPER-383. Asynchronous version of createLedger(). (flavio via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@779433 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 16 éve
szülő
commit
ba65c32347

+ 2 - 0
CHANGES.txt

@@ -170,6 +170,8 @@ IMPROVEMENTS:
 
   ZOOKEEPER-292. commit configure scripts (autotools) to svn for c projects and
   include in release (phunt via breed)
+
+  ZOOKEEPER-383. Asynchronous version of createLedger(). (flavio via mahadev)
  
 NEW FEATURES:
 

+ 7 - 7
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java

@@ -32,7 +32,7 @@ import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.client.AddCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
 import org.apache.log4j.Logger;
 
 
@@ -204,7 +204,7 @@ public class Bookie extends Thread {
 
     static class QueueEntry {
         QueueEntry(ByteBuffer entry, long ledgerId, long entryId, 
-                AddCallback cb, Object ctx) {
+                WriteCallback cb, Object ctx) {
             this.entry = entry.duplicate();
             this.cb = cb;
             this.ctx = ctx;
@@ -218,7 +218,7 @@ public class Bookie extends Thread {
         
         long entryId;
 
-        AddCallback cb;
+        WriteCallback cb;
 
         Object ctx;
     }
@@ -248,7 +248,7 @@ public class Bookie extends Thread {
                     if (qe == null || toFlush.size() > 100) {
                         logFile.force(false);
                         for (QueueEntry e : toFlush) {
-                            e.cb.addComplete(0, e.ledgerId, e.entryId, e.ctx);
+                            e.cb.writeComplete(0, e.ledgerId, e.entryId, e.ctx);
                         }
                         toFlush.clear();
                     }
@@ -280,7 +280,7 @@ public class Bookie extends Thread {
         }
     }
     
-    public void addEntry(ByteBuffer entry, AddCallback cb, Object ctx, byte[] masterKey)
+    public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException {
         
         long ledgerId = entry.getLong();
@@ -315,10 +315,10 @@ public class Bookie extends Thread {
     }
 
     // The rest of the code is test stuff
-    static class CounterCallback implements AddCallback {
+    static class CounterCallback implements WriteCallback {
         int count;
 
-        synchronized public void addComplete(int rc, long l, long e, Object ctx) {
+        synchronized public void writeComplete(int rc, long l, long e, Object ctx) {
             count--;
             if (count == 0) {
                 notifyAll();

+ 0 - 37
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AddCallback.java

@@ -1,37 +0,0 @@
-package org.apache.bookkeeper.client;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-/**
- * Callback interface for add entry calls.
- */
-
-public interface AddCallback {
-	/**
-	 * Callback declaration
-	 * 
-	 * @param rc	return code
-	 * @param ledgerId	ledger identifier
-	 * @param entryId	entry identifier
-	 * @param ctx	control object
-	 */
-    void addComplete(int rc, long ledgerId, long entryId, Object ctx);
-}

+ 82 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java

@@ -0,0 +1,82 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface AsyncCallback {
+    public interface AddCallback {
+        /**
+         * Callback declaration
+         * 
+         * @param rc    return code
+         * @param ledgerId  ledger identifier
+         * @param entryId   entry identifier
+         * @param ctx   control object
+         */
+        void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
+    }
+    
+    public interface CloseCallback {
+        /**
+         * Callback definition
+         * 
+         * @param rc    return code
+         * @param ledgerId  ledger identifier
+         * @param ctx   control object
+         */
+        void closeComplete(int rc, LedgerHandle lh, Object ctx);
+    }
+    
+    public interface CreateCallback {
+        /**
+         * Declaration of callback method
+         * 
+         * @param rc    return status
+         * @param lh    ledger handle
+         * @param ctx   control object
+         */
+        
+        void createComplete(int rc, LedgerHandle lh, Object ctx);
+    }
+    
+    public interface OpenCallback {
+        /**
+         * Callback for asynchronous call to open ledger
+         * 
+         * @param rc
+         * @param lh
+         * @param ctx
+         */
+        
+        public void openComplete(int rc, LedgerHandle lh, Object ctx);
+        
+    }
+    
+    public interface ReadCallback {
+        /**
+         * Callback declaration
+         * 
+         * @param rc    return code
+         * @param ledgerId  ledger identifier
+         * @param seq   sequence of entries
+         * @param ctx   control object
+         */
+        void readComplete(int rc, LedgerHandle lh, LedgerSequence seq, Object ctx);
+    }
+    
+}

+ 69 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java

@@ -0,0 +1,69 @@
+package org.apache.bookkeeper.client;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+public interface BKDefs { 
+    /**
+     * String used to construct znode paths. They are used in BookKeeper
+     *  and LedgerManagementProcessor.
+     */
+    static public final String prefix = "/ledgers/L";
+    static public final String ensemble = "/ensemble"; 
+    static public final String quorumSize = "/quorum";
+    static public final String close = "/close";
+    static public final String quorumMode = "/mode";
+    
+    /**
+     * Status ok
+     */
+    public final int EOK = 0;
+    
+    /**
+     * Insufficient bookies
+     */
+    public final int EIB = -1;
+ 
+    /**
+     * No such a ledger
+     */
+    public final int ENL = -2;
+    
+    /**
+     * Error while recovering ledger
+     */
+    public final int ERL = -3;
+    
+    /**
+     * Error while reading from zookeeper or writing to zookeeper
+     */
+    public final int EZK = -4;
+
+    /**
+     * IO error, typically when trying to connect to a bookie
+     */
+    public final int EIO = -5;
+    
+    /**
+     * Exceeded number of retries
+     */
+    public final int ENR = -6;
+}

+ 87 - 39
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java

@@ -34,8 +34,12 @@ import java.net.InetSocketAddress;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.LedgerManagementProcessor.CreateLedgerOp;
+import org.apache.bookkeeper.client.LedgerManagementProcessor.OpenLedgerOp;
 import org.apache.log4j.Logger;
 
 import org.apache.zookeeper.data.Stat;
@@ -58,23 +62,21 @@ import org.apache.zookeeper.WatchedEvent;
 
 public class BookKeeper 
 implements Watcher {
-    /**
-     * the chain of classes to get a client 
-     * request from the bookeeper class to the 
-     * server is 
-     * bookkeeper->quorumengine->bookiehandle->bookieclient
-     * 
-     */
+ 
     Logger LOG = Logger.getLogger(BookKeeper.class);
-
-    static public final String prefix = "/ledgers/L";
-    static public final String ensemble = "/ensemble"; 
-    static public final String quorumSize = "/quorum";
-    static public final String close = "/close";
-    static public final String quorumMode = "/mode";
     
     ZooKeeper zk = null;
-    QuorumEngine engine = null;
+    
+    /*
+     * The ledgerMngProcessor is a thread that processes
+     * asynchronously requests that handle ledgers, such
+     * as create, open, and close.
+     */
+    private static LedgerManagementProcessor ledgerMngProcessor;
+    
+    /*
+     * Blacklist of bookies
+     */
     HashSet<InetSocketAddress> bookieBlackList;
     
     LedgerSequence responseRead;
@@ -86,8 +88,6 @@ implements Watcher {
         //Create ZooKeeper object
         this.zk = new ZooKeeper(servers, 10000, this);
         
-        //Create hashmap for quorum engines
-        //this.qeMap = new HashMap<Long, ArrayBlockingQueue<Operation> >();
         //List to enable clients to blacklist bookies
         this.bookieBlackList = new HashSet<InetSocketAddress>();
     }
@@ -99,8 +99,6 @@ implements Watcher {
         LOG.debug("Process: " + event.getType() + " " + event.getPath());
     }
     
-
-    
     /**
      * Formats ledger ID according to ZooKeeper rules
      * 
@@ -118,6 +116,14 @@ implements Watcher {
         return zk;
     }
     
+    LedgerManagementProcessor getMngProcessor(){
+        if (ledgerMngProcessor == null){
+            ledgerMngProcessor = new LedgerManagementProcessor(this);
+            ledgerMngProcessor.start();
+        }
+        return ledgerMngProcessor;
+    }
+    
     /**
      * Creates a new ledger. To create a ledger, we need to specify the ensemble
      * size, the quorum size, the operation mode, and a password. The ensemble size
@@ -158,7 +164,7 @@ implements Watcher {
          * Create ledger node on ZK.
          * We get the id from the sequence number on the node.
          */
-        String path = zk.create(prefix, new byte[0], 
+        String path = zk.create(BKDefs.prefix, new byte[0], 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
         /* 
          * Extract ledger id.
@@ -175,21 +181,21 @@ implements Watcher {
         /* 
          * Select ensSize servers to form the ensemble
          */
-        path = zk.create(prefix + getZKStringId(lId) + ensemble, new byte[0], 
+        path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0], 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         /* 
          * Add quorum size to ZK metadata
          */
         ByteBuffer bb = ByteBuffer.allocate(4);
         bb.putInt(qSize);
-        zk.create(prefix + getZKStringId(lId) + quorumSize, bb.array(), 
+        zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(), 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         /* 
          * Quorum mode
          */
         bb = ByteBuffer.allocate(4);
         bb.putInt(mode.ordinal());
-        zk.create(prefix + getZKStringId(lId) + quorumMode, bb.array(), 
+        zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(), 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         /* 
          * Create QuorumEngine
@@ -222,7 +228,7 @@ implements Watcher {
         	    bindexBuf.putInt(bindex);
         	    
         	    String pBookie = "/" + bookie;
-        	    zk.create(prefix + getZKStringId(lId) + ensemble + pBookie, bindexBuf.array(), 
+        	    zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(), 
         	            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         	} catch (IOException e) {
         	    LOG.error(e);
@@ -246,7 +252,39 @@ implements Watcher {
         return createLedger(3, 2, QMode.VERIFIABLE, passwd);
     }
 
-
+    /**
+     * Asychronous call to create ledger
+     * 
+     * @param ensSize
+     * @param qSize
+     * @param mode
+     * @param passwd
+     * @param cb
+     * @param ctx
+     * @throws KeeperException
+     * @throws InterruptedException
+     * @throws IOException
+     * @throws BKException
+     */
+    public void asyncCreateLedger(int ensSize, 
+            int qSize, 
+            QMode mode,  
+            byte passwd[],
+            CreateCallback cb,
+            Object ctx
+            )
+    throws KeeperException, InterruptedException, 
+    IOException, BKException {
+        CreateLedgerOp op = new CreateLedgerOp(ensSize, 
+                qSize, 
+                mode, 
+                passwd, 
+                cb, 
+                ctx);
+        LedgerManagementProcessor lmp = getMngProcessor();
+        lmp.addOp(op);
+        
+    }
     
     /**
      * Open existing ledger for reading. Default for quorum size is 2.
@@ -263,7 +301,7 @@ implements Watcher {
         /*
          * Check if ledger exists
          */
-        if(zk.exists(prefix + getZKStringId(lId), false) == null){
+        if(zk.exists(BKDefs.prefix + getZKStringId(lId), false) == null){
             LOG.error("Ledger " + getZKStringId(lId) + " doesn't exist.");
             return null;
         }
@@ -271,7 +309,7 @@ implements Watcher {
         /*
          * Get quorum size.
          */
-        ByteBuffer bb = ByteBuffer.wrap(zk.getData(prefix + getZKStringId(lId) + quorumSize, false, stat));
+        ByteBuffer bb = ByteBuffer.wrap(zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, false, stat));
         int qSize = bb.getInt();
          
         /*
@@ -279,21 +317,21 @@ implements Watcher {
          */
         
         long last = 0;
-        LOG.debug("Close path: " + prefix + getZKStringId(lId) + close);
-        if(zk.exists(prefix + getZKStringId(lId) + close, false) == null){
+        LOG.debug("Close path: " + BKDefs.prefix + getZKStringId(lId) + BKDefs.close);
+        if(zk.exists(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, false) == null){
             recoverLedger(lId, passwd);
         }
             
         stat = null;
-        byte[] data = zk.getData(prefix + getZKStringId(lId) + close, false, stat);
+        byte[] data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, false, stat);
         ByteBuffer buf = ByteBuffer.wrap(data);
         last = buf.getLong();
-        //zk.delete(prefix + getZKStringId(lId) + close, -1);
+        //zk.delete(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, -1);
         
         /*
          * Quorum mode 
          */
-        data = zk.getData(prefix + getZKStringId(lId) + quorumMode, false, stat);
+        data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
         buf = ByteBuffer.wrap(data);
         //int ordinal = buf.getInt();
         
@@ -321,12 +359,12 @@ implements Watcher {
          */
         
         List<String> list = 
-            zk.getChildren(prefix + getZKStringId(lId) + ensemble, false);
+            zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
         
         LOG.info("Length of list of bookies: " + list.size());
         for(int i = 0 ; i < list.size() ; i++){
             for(String s : list){
-                byte[] bindex = zk.getData(prefix + getZKStringId(lId) + ensemble + "/" + s, false, stat);
+                byte[] bindex = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + "/" + s, false, stat);
                 ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
                 if(bindexBuf.getInt() == i){                      
                     try{
@@ -342,13 +380,23 @@ implements Watcher {
         return lh;
     }    
     
+    public void asyncOpenLedger(long lId, byte passwd[], OpenCallback cb, Object ctx)
+    throws InterruptedException{
+        OpenLedgerOp op = new OpenLedgerOp(lId, 
+                passwd,  
+                cb, 
+                ctx);
+        LedgerManagementProcessor lmp = getMngProcessor();
+        lmp.addOp(op);
+    }
+    
     /**
      * Parses address into IP and port.
      * 
      *  @param addr	String
      */
     
-    private InetSocketAddress parseAddr(String s){
+    InetSocketAddress parseAddr(String s){
         String parts[] = s.split(":");
         if (parts.length != 2) {
             System.out.println(s
@@ -367,7 +415,7 @@ implements Watcher {
      */
     public boolean hasClosed(long ledgerId)
     throws KeeperException, InterruptedException{
-        String closePath = prefix + getZKStringId(ledgerId) + close;
+        String closePath = BKDefs.prefix + getZKStringId(ledgerId) + BKDefs.close;
         if(zk.exists(closePath, false) == null) return false;
         else return true;
     }
@@ -379,7 +427,7 @@ implements Watcher {
      * @param passwd	password
      */
     
-    private boolean recoverLedger(long lId, byte passwd[])
+    boolean recoverLedger(long lId, byte passwd[])
     throws KeeperException, InterruptedException, IOException, BKException {
         
         Stat stat = null;
@@ -389,7 +437,7 @@ implements Watcher {
         /*
          * Get quorum size.
          */
-        ByteBuffer bb = ByteBuffer.wrap(zk.getData(prefix + getZKStringId(lId) + quorumSize, false, stat));
+        ByteBuffer bb = ByteBuffer.wrap(zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, false, stat));
         int qSize = bb.getInt();
                 
         
@@ -398,7 +446,7 @@ implements Watcher {
          */
         
         List<String> list = 
-            zk.getChildren(prefix + getZKStringId(lId) + ensemble, false);
+            zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
         
         ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
         for(String s : list){
@@ -408,7 +456,7 @@ implements Watcher {
         /*
          * Quorum mode 
          */
-        byte[] data = zk.getData(prefix + getZKStringId(lId) + quorumMode, false, stat);
+        byte[] data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
         ByteBuffer buf = ByteBuffer.wrap(data);
         //int ordinal = buf.getInt();
             

+ 6 - 5
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java

@@ -115,17 +115,18 @@ class ClientCBWorker extends Thread{
                     
                         aOp.getLedger().setAddConfirmed(aOp.entry);
                         aOp.cb.addComplete(aOp.getErrorCode(),
-                            aOp.getLedger().getId(), aOp.entry, 
-                            aOp.ctx);
+                                aOp.getLedger(),
+                                aOp.entry, 
+                                aOp.ctx);
                         
                         break;
                     case Operation.READ:
                         ReadOp rOp = (ReadOp) op;
                         //LOG.debug("Got one message from the queue: " + rOp.firstEntry);
                         rOp.cb.readComplete(rOp.getErrorCode(), 
-                            rOp.getLedger().getId(), 
-                            new LedgerSequence(rOp.seq), 
-                            rOp.ctx);
+                                rOp.getLedger(),
+                                new LedgerSequence(rOp.seq), 
+                                rOp.ctx);
                         break;
                     }
                 }

+ 0 - 29
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java

@@ -1,29 +0,0 @@
-package org.apache.bookkeeper.client;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-public class ErrorCodes {
-
-    static final int ENUMRETRIES = -1;
-    static final int ENOAVAILABLEBOOKIE = -2;
-    
-}

+ 22 - 4
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java

@@ -31,7 +31,11 @@ import java.util.ArrayList;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieHandle;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.LedgerManagementProcessor.CloseLedgerOp;
 import org.apache.bookkeeper.client.QuorumEngine.Operation;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
@@ -185,7 +189,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
     /** get the quorum engine
      * @return return the quorum engine
      */
-    QuorumEngine getQuorumEngine(QuorumEngine qe) {
+    QuorumEngine getQuorumEngine() {
         return this.qe;
     }
     
@@ -446,7 +450,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         ByteBuffer last = ByteBuffer.allocate(8);
         last.putLong(lastAddConfirmed);
         LOG.info("Last saved on ZK is: " + lastAddConfirmed);
-        String closePath = BookKeeper.prefix + bk.getZKStringId(getId()) + BookKeeper.close; 
+        String closePath = BKDefs.prefix + bk.getZKStringId(getId()) + BKDefs.close; 
         if(bk.getZooKeeper().exists(closePath, false) == null){
            bk.getZooKeeper().create(closePath, 
                    last.array(), 
@@ -461,6 +465,20 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         qe.sendOp(sOp);
     }
     
+    /**
+     * Asynchronous close
+     *
+     * @param cb    callback implementation
+     * @param ctx   control object
+     * @throws InterruptedException
+     */
+    public void asyncClose(CloseCallback cb, Object ctx)
+    throws InterruptedException {
+        CloseLedgerOp op = new CloseLedgerOp(this, cb, ctx);
+        LedgerManagementProcessor lmp = bk.getMngProcessor();
+        lmp.addOp(op);  
+    }
+       
     /**
      * Read a sequence of entries asynchronously.
      * 
@@ -552,7 +570,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
      * @param ctx   control object
      */
     public void readComplete(int rc, 
-            long ledger, 
+            LedgerHandle lh,
             LedgerSequence seq,  
             Object ctx){        
         
@@ -571,7 +589,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
      * @param ctx   control object
      */
     public void addComplete(int rc, 
-            long ledger, 
+            LedgerHandle lh,
             long entry, 
             Object ctx){          
         RetCounter counter = (RetCounter) ctx;

+ 1144 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java

@@ -0,0 +1,1144 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+import org.apache.log4j.Logger;
+
+public class LedgerManagementProcessor 
+extends Thread 
+implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
+    
+   Logger LOG = Logger.getLogger(LedgerManagementProcessor.class);
+    
+    static final int MAXATTEMPTS = 3;
+    
+    /**
+     *  Types of ledger operations: CREATE, OPEN, CLOSE
+     */
+    static enum OpType {CREATE, OPEN, CLOSE};
+    
+    /**
+     * Operation descriptor for asynchronous execution. 
+     *
+     */
+    static class LedgerOp {
+        private OpType op;
+        private int action;
+        private int rc = 0;
+        private Object ctx;
+        private LedgerHandle lh;
+
+        /**
+         * Constructor sets the operation type.
+         * 
+         * @param op    operation type
+         */
+        LedgerOp(OpType op, Object ctx){
+            this.op = op;
+            this.ctx = ctx;
+            this.action = 0;
+        }        
+        
+        /**
+         * Returns the operation type.
+         * 
+         * @return OpType
+         */
+        OpType getType(){
+            return op;
+        }
+
+        /**
+         * Set value of action
+         * 
+         * @return
+         */
+        int setAction(int action){
+            return this.action = action;
+        }
+        
+        /**
+         * Return value of action
+         * 
+         * @return
+         */
+        int getAction(){
+            return action;
+        }
+        
+        /**
+         * Set the return code
+         * 
+         * @param rc return code
+         */
+        void setRC(int rc){
+            this.rc = rc;
+        }
+        
+        /**
+         * Return return code
+         * 
+         * @return
+         */
+        int getRC(){
+            return rc;
+        }
+        
+        /**
+         * Return control object
+         * 
+         * @return Object   control object
+         */
+        Object getCtx(){
+            return ctx;
+        }
+        
+        /**
+         * Set ledger handle
+         * 
+         * @param lh ledger handle
+         */
+        
+        void setLh(LedgerHandle lh){
+            this.lh = lh;
+        }
+        
+        /**
+         * Return ledger handle
+         * 
+         * @return LedgerHandle ledger handle
+         */
+        LedgerHandle getLh(){
+            return this.lh;
+        }
+    }
+
+    /**
+     * Create ledger descriptor for asynchronous execution.
+     */
+    static class CreateLedgerOp extends LedgerOp {
+        private long lId;
+        private int ensSize;
+        private int qSize; 
+        private QMode mode;  
+        private byte passwd[];
+        
+        private CreateCallback cb;
+        
+        private List<String> available;
+        private String path;
+        
+        AtomicInteger zkOpCounter;
+        
+        /**
+         * Constructor of request to create a new ledger.
+         * 
+         * @param ensSize   ensemble size
+         * @param qSize     quorum size
+         * @param mode      quorum mode (VERIFIABLE or GENERIC)
+         * @param passwd    password
+         * @param cb        create callback implementation
+         * @param ctx       control object
+         */
+        CreateLedgerOp(int ensSize,
+                int qSize, 
+                QMode mode,  
+                byte passwd[],
+                CreateCallback cb,
+                Object ctx)
+                throws BKException{
+            super(OpType.CREATE, ctx);
+            this.ensSize = ensSize;
+            this.qSize = qSize;
+            this.mode = mode;
+            this.passwd = passwd;
+            this.cb = cb;
+            
+            /*
+             * There are 5 fixed ZK operations, and a variable
+             * number to set the bookies of the new ledger. We
+             * initialize it with 5 and increment as we add bookies
+             * in action 2. 
+             */
+            this.zkOpCounter = new AtomicInteger(5);
+            
+            // Check that quorum size follows the minimum
+            long t;
+            switch(mode){
+            case VERIFIABLE:
+                t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/2));
+                if(t == 0){
+                    throw BKException.create(Code.QuorumException);
+                }
+                break;
+            case GENERIC:
+                t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/3));
+                if(t == 0){
+                    throw BKException.create(Code.QuorumException);
+                }
+                break;
+            case FREEFORM:
+                break;
+            }
+        }
+        
+        /**
+         * Constructor for cloning. This is necessary because there
+         * are create actions that issue multiple ZK operations, and 
+         * when we queue back the result of the operation we need the
+         * operation object to reflect the result of the operation.
+         * 
+         * @param op
+         */
+        CreateLedgerOp(CreateLedgerOp op){
+            super(OpType.CREATE, op.getCtx());
+            setRC(op.getRC());
+            setAction(op.getAction());
+            
+            this.setLh(op.getLh());
+            this.lId = op.getLid();
+            this.ensSize = op.getEnsembleSize();
+            this.qSize = op.getQuorumSize();
+            this.mode = op.getMode();
+            this.passwd = op.getPasswd();
+            this.cb = op.getCb();
+            this.available = op.getAvailable();
+            this.path = op.getPath(); 
+            this.zkOpCounter = op.zkOpCounter;
+        }
+        
+        /**
+         * Set ledger identifier (sequence number
+         * of ZooKeeper)
+         * 
+         * @param lId
+         */
+        void setLid(long lId){
+            this.lId = lId;
+        }
+        
+        /**
+         * Return ledger identifier
+         * 
+         * @return long ledger identifier
+         */
+        long getLid(){
+            return lId;
+        }
+        
+        /**
+         * Return ensemble size
+         * 
+         * @return int ensemble size
+         */
+        int getEnsembleSize(){
+            return ensSize;
+        }
+        
+        /**
+         * Return quorum size
+         * 
+         * @return int quorum size
+         */
+        int getQuorumSize(){
+           return qSize; 
+        }
+        
+        /**
+         * Return quorum mode
+         * 
+         * @return  QMode   quorum mode
+         */
+        QMode getMode(){
+            return mode;   
+        }
+        
+        /**
+         * Return password
+         * 
+         * @return byte[] passwd
+         */
+        byte[] getPasswd(){
+            return passwd;
+        }
+        
+        /**
+         * Return callback implementation
+         * 
+         * @return CreateCallback   callback implementation
+         */
+        CreateCallback getCb(){
+            return cb;
+        }
+        
+        
+        
+        /**
+         * Set the list of available bookies for processing
+         * 
+         * @param available lost of available bookies
+         */
+        void addAvailable(List<String> available){
+            this.available = available;
+        }
+        
+        /**
+         * Return list of bookies available
+         * 
+         * @return List<String> list of bookies available
+         */
+        List<String> getAvailable(){
+            return available;
+        }
+        
+        /**
+         * Set path as returned in the callback
+         * 
+         * @param path  created path
+         */
+        void setPath(String path){
+            this.path = path;
+        }
+
+        /**
+         * Return path
+         * 
+         * @return String   path
+         */
+        String getPath(){
+            return path;
+        }
+    }
+    
+    /**
+     * Open ledger descriptor for asynchronous execution.
+     */
+    static class OpenLedgerOp extends LedgerOp {
+        private long lId; 
+        private byte passwd[];
+        private OpenCallback cb;
+        
+        private int qSize;
+        private long last;
+        private QMode qMode;
+        private List<String> bookieIds;
+
+        /**
+         * Constructor of request to open a ledger.
+         * 
+         * @param lId   ledger identifier
+         * @param passwd    password to access ledger
+         */
+        OpenLedgerOp(long lId, 
+                byte passwd[],
+                OpenCallback cb,
+                Object ctx){
+            super(OpType.OPEN, ctx);
+            this.lId = lId;
+            this.passwd = passwd;
+        }
+        
+        /**
+         * Return ledger identifier
+         * 
+         * @return long
+         */
+        long getLid(){
+            return lId;
+        }
+        
+        /**
+         * Return password
+         * @return byte[]
+         */
+        byte[] getPasswd(){
+            return passwd;
+        }
+
+        /**
+         * Return callback object
+         * 
+         * @return OpenCallback 
+         */
+        OpenCallback getCb(){
+            return this.cb;
+        }
+             
+        /**
+         * Set quorum size as extracted from ZK
+         * 
+         * @param data  znode data
+         */
+        void setQSize(byte[] data){
+            ByteBuffer bb = ByteBuffer.wrap(data);
+            this.qSize = bb.getInt();
+        }
+        
+        /**
+         * Return quorum size
+         * 
+         * @return  int quorum size
+         */
+        int getQSize(){
+            return qSize;
+        }
+        
+        /**
+         * Set last value as read from close znode
+         * 
+         * @param last
+         */
+        void setLast(long last){
+            this.last = last;
+        }
+        
+        /**
+         * Return last value
+         * 
+         * @return long last value
+         */
+        long getLast(){
+            return last;
+        }
+        
+        /**
+         * Set ledger mode 
+         *    
+         * @param mode  GENERIC or VERIFIABLE
+         */
+        void setQMode(QMode mode){
+            this.qMode = mode;
+        }
+        
+        /**
+         * Return ledger mode
+         * 
+         * @return QMode   ledger mode
+         */
+        QMode getQMode(){
+            return qMode;
+        }
+        
+        /**
+         * Set list of bookie identifiers
+         * 
+         * @param list  list of bbokie identifiers
+         */
+        void addBookieIds(List<String> list){
+            this.bookieIds = list;
+        }
+        
+        /**
+         * Return list of bookie identifiers
+         * 
+         * @return List<String> list of bookie identifiers
+         */
+        List<String> getBookieIds(){
+            return bookieIds;
+        }
+    }
+    
+    /**
+     * Close ledger descriptor for asynchronous execution.
+     */
+    static class CloseLedgerOp extends LedgerOp {
+        private long lid;
+        private ByteBuffer last;
+        private String closePath;
+        private CloseCallback cb;
+        private Stat stat;
+        
+        /**
+         * Constructor of request to close a ledger
+         * 
+         * @param lh    ledger handle
+         */
+        CloseLedgerOp(LedgerHandle lh, 
+                CloseCallback cb,
+                Object ctx){
+            super(OpType.CLOSE, ctx);
+       
+            this.setLh(lh);
+            this.lid = lh.getId();
+            this.last = ByteBuffer.allocate(8);
+            this.last.putLong(lh.getAddConfirmed());
+            this.cb = cb;
+        }
+        
+        /**
+         * Return a ByteBuffer containing the last entry written
+         * 
+         * @return ByteBuffer identifier of last entry
+         */
+        ByteBuffer getLast(){
+            return last;
+        }
+        
+        /**
+         * Return ledger identifier
+         * 
+         * @return long
+         */
+        long getLid(){
+            return this.lid;
+        }
+        
+        /**
+         * Set close path
+         * 
+         * @param path  close path
+         */
+        void setClosePath(String path){
+            this.closePath = path;
+        }
+        
+        /**
+         * Return close path string.
+         * 
+         * @return String   close path
+         */
+        String getClosePath(){
+            return this.closePath;
+        }
+        
+        
+        /**
+         * Return callback object.
+         * 
+         * @return CloseCallback 
+         */
+        CloseCallback getCb(){
+            return this.cb;
+        }
+     
+    
+        /**
+         * Set value of stat
+         * 
+         * @param stat stat object returned by ZK callback
+         */
+        void setStat (Stat stat){
+            this.stat = stat;
+        }
+        
+        /**
+         * Return value of stat
+         * 
+         * @return Stat
+         */
+        
+        Stat getStat (){
+            return stat;
+        }
+    }
+    
+    /*
+     * BookKeeper parent.
+     */
+    BookKeeper bk;
+    /*
+     * Queue of outstanding operations
+     */
+    ArrayBlockingQueue<LedgerOp> outstandingRequests = 
+        new ArrayBlockingQueue<LedgerOp>(200);
+    
+    
+    /**
+     * Add ledger operation to queue of pending
+     * 
+     * @param op    ledger operation
+     */
+    void addOp(LedgerOp op)
+    throws InterruptedException{
+        LOG.info("Queuing new op");
+        outstandingRequests.put(op);
+    }
+    
+    /**
+     * Constructor takes BookKeeper object 
+     * 
+     * @param bk BookKeeper object
+     */
+    
+    LedgerManagementProcessor(BookKeeper bk){
+        this.bk = bk;
+    }
+    
+    /**
+     * Run method
+     */
+    public void run(){
+        while(true){
+            try{
+                LedgerOp op = outstandingRequests.take();
+            
+                switch(op.getType()){
+                case CREATE:
+                    processCreate((CreateLedgerOp) op);
+                    break;            
+                case OPEN:
+                    processOpen((OpenLedgerOp) op);
+                    break;
+                case CLOSE:
+                    processClose((CloseLedgerOp) op);
+                    break;
+                }
+            } catch(InterruptedException e){
+                LOG.warn("Interrupted while waiting in the queue of incoming requests");   
+            }
+        }
+    }
+    
+    /**
+     * Processes a create ledger operation.
+     * 
+     * @param cop   create ledger operation to process
+     * @throws InterruptedException
+     */
+    
+    private void processCreate(CreateLedgerOp cop)
+    throws InterruptedException {
+        if(cop.getRC() != BKDefs.EOK)
+            cop.getCb().createComplete(cop.getRC(), null, cop.getCtx());
+
+        switch(cop.getAction()){
+        case 0:
+            LOG.info("Action 0 of create");
+            /*
+             * Create ledger node on ZK.
+             * We get the id from the sequence number on the node.
+             */
+            bk.getZooKeeper().create(BKDefs.prefix, 
+                new byte[0], 
+                Ids.OPEN_ACL_UNSAFE, 
+                CreateMode.PERSISTENT_SEQUENTIAL,
+                this,
+                cop);
+        break;
+        case 1:
+            LOG.info("Action 1 of create");
+            /* 
+             * Extract ledger id.
+             */
+            String parts[] = cop.getPath().split("/");
+            String subparts[] = parts[2].split("L");
+            long lId = Long.parseLong(subparts[1]);
+            cop.setLid(lId);
+        
+            LedgerHandle lh = new LedgerHandle(bk, lId, 0, cop.getQuorumSize(), cop.getMode(), cop.getPasswd());
+            cop.setLh(lh);
+            /* 
+             * Get children from "/ledgers/available" on zk
+             */
+
+            bk.getZooKeeper().getChildren("/ledgers/available", false, this, cop);
+            /* 
+             * Select ensSize servers to form the ensemble
+             */
+            bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(lId) + BKDefs.ensemble, new byte[0], 
+                    Ids.OPEN_ACL_UNSAFE, 
+                    CreateMode.PERSISTENT,
+                    this,
+                    cop);
+            /* 
+             * Add quorum size to ZK metadata
+             */
+            ByteBuffer bb = ByteBuffer.allocate(4);
+            bb.putInt(cop.getQuorumSize());
+            
+            bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(lId) + cop.getQuorumSize(), 
+                    bb.array(), 
+                    Ids.OPEN_ACL_UNSAFE, 
+                    CreateMode.PERSISTENT,
+                    this,
+                    cop);
+            /* 
+             * Quorum mode
+             */
+            bb = ByteBuffer.allocate(4);
+            bb.putInt(cop.getMode().ordinal());
+            
+            bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(lId) + cop.getMode(), 
+                    bb.array(), 
+                    Ids.OPEN_ACL_UNSAFE, 
+                    CreateMode.PERSISTENT,
+                    this,
+                    cop);
+            break;
+        case 2:
+            LOG.info("Action 2 of create");
+            /*
+             * Adding bookies to ledger handle
+             */
+            Random r = new Random();
+            List<String> children = cop.getAvailable();
+            for(int i = 0; i < cop.getEnsembleSize(); i++){
+                int index = 0;
+                if(children.size() > 1) 
+                    index = r.nextInt(children.size() - 1);
+                else if(children.size() == 1)
+                    index = 0;
+                else {
+                    LOG.error("Not enough bookies available");    
+                    cop.setRC(BKDefs.EIB);
+                }
+            
+                try{
+                    String bookie = children.remove(index);
+                    LOG.info("Bookie: " + bookie);
+                    InetSocketAddress tAddr = bk.parseAddr(bookie);
+                    int bindex = cop.getLh().addBookie(tAddr); 
+                    ByteBuffer bindexBuf = ByteBuffer.allocate(4);
+                    bindexBuf.putInt(bindex);
+                
+                    String pBookie = "/" + bookie;
+                    cop.zkOpCounter.getAndIncrement();
+                    bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(cop.getLid()) + BKDefs.ensemble + pBookie, 
+                            bindexBuf.array(), 
+                            Ids.OPEN_ACL_UNSAFE, 
+                            CreateMode.PERSISTENT, 
+                            this,
+                            cop);
+                } catch (IOException e) {
+                    LOG.error(e);
+                    i--;
+                } 
+            }
+            break;
+        case 3:
+            LOG.info("Action 3 of create");
+            LOG.debug("Created new ledger");
+            cop.getCb().createComplete(cop.getRC(), cop.getLh(), cop.getCtx());   
+            break;
+        case 4:
+            break;
+        }
+    }
+        
+    /**
+     *  Processes open ledger operation.
+     * 
+     * @param oop   open ledger operation to process.
+     * @throws InterruptedException
+     */
+    private void processOpen(OpenLedgerOp oop) 
+    throws InterruptedException {    
+        /*
+         * Case for open operation
+         */
+        if(oop.getRC() != BKDefs.EOK)
+            oop.getCb().openComplete(oop.getRC(), null, oop.getCtx());
+        
+        switch(oop.getAction()){
+        case 0:                    
+            /*
+             * Check if ledger exists
+             */
+            bk.getZooKeeper().exists(BKDefs.prefix + bk.getZKStringId(oop.getLid()), 
+                    false,
+                    this,
+                    oop);
+            break;
+        case 1:                    
+            /*
+             * Get quorum size.
+             */
+            bk.getZooKeeper().getData(BKDefs.prefix + bk.getZKStringId(oop.getLid()) + BKDefs.quorumSize, 
+                    false,
+                    this,
+                    oop);
+            break;    
+        case 2:         
+            /*
+             * Get last entry written from ZK 
+             */
+                
+            long last = 0;
+            LOG.debug("Close path: " + BKDefs.prefix + bk.getZKStringId(oop.getLid()) + BKDefs.close);
+            bk.getZooKeeper().exists(BKDefs.prefix + bk.getZKStringId(oop.getLid()) + BKDefs.close, 
+                    false,
+                    this,
+                    oop);
+            break;
+        case 3:
+            try{
+                bk.recoverLedger(oop.getLid(), oop.getPasswd());
+            } catch(Exception e){
+                LOG.error("Cannot recover ledger", e);
+                oop.getCb().openComplete(BKDefs.ERL, null, oop.getCtx());
+            }
+            /*
+             * In the case of recovery, it falls through to the
+             * next case intentionally.
+             */
+        case 4:   
+            bk.getZooKeeper().getData(BKDefs.prefix + bk.getZKStringId(oop.getLid()) + BKDefs.close, 
+                    false, 
+                    this,
+                    oop);
+            break;
+        case 5:                
+            /*
+             * Quorum mode 
+             */
+            bk.getZooKeeper().getData(BKDefs.prefix + bk.getZKStringId(oop.getLid()) + BKDefs.quorumMode, 
+                    false, 
+                    this,
+                    oop);
+        case 6:         
+            /*
+             *  Create ledger handle
+             */
+            LedgerHandle lh = new LedgerHandle(bk, oop.getLid(), oop.getLast(), oop.getQSize(), oop.getQMode(), oop.getPasswd());
+                
+            /*
+             * Get children of "/ledgers/id/ensemble" 
+             */
+              
+            bk.getZooKeeper().getChildren(BKDefs.prefix + bk.getZKStringId(oop.getLid()) + BKDefs.ensemble, 
+                    false,
+                    this,
+                    oop);
+            break;
+
+        case 7:
+            List<String> list = oop.getBookieIds();
+            LOG.info("Length of list of bookies: " + list.size());
+            try{
+                for(int i = 0 ; i < list.size() ; i++){
+                    for(String s : list){
+                        byte[] bindex = bk.getZooKeeper().getData(BKDefs.prefix + bk.getZKStringId(oop.getLid()) + BKDefs.ensemble + "/" + s, 
+                                false, new Stat());
+                        ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
+                        if(bindexBuf.getInt() == i){                      
+                            oop.getLh().addBookie(bk.parseAddr(s));
+                        }
+                    }
+                }
+            } catch(KeeperException e){
+                LOG.error("Exception while adding bookies", e);
+                oop.setRC(BKDefs.EZK);
+            } catch(IOException e){
+                LOG.error("Exception while trying to connect to bookie");
+                oop.setRC(BKDefs.EIO);
+            } finally {
+                oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
+            }
+        }
+    }    
+    
+    
+   /**
+    * Processes close ledger operation.
+    * 
+    * @param clop   close ledger operation to process.
+    * @throws InterruptedException
+    */
+    
+    private void processClose(CloseLedgerOp clop)
+    throws InterruptedException {
+        if(clop.getRC() != BKDefs.EOK)
+            clop.getCb().closeComplete(clop.getRC(), clop.getLh(), clop.getCtx());
+        
+        switch(clop.getAction()){
+        case 0:
+            LOG.info("Last saved on ZK is: " + clop.getLh().getLast()); 
+            clop.setClosePath(BKDefs.prefix + bk.getZKStringId(getId()) + BKDefs.close);
+            bk.getZooKeeper().exists(clop.getClosePath(), null, this, clop);
+            break;             
+        case 1:
+            if(clop.getStat() == null){
+                bk.getZooKeeper().create(clop.getClosePath(), 
+                        clop.getLast().array(), 
+                        Ids.OPEN_ACL_UNSAFE, 
+                        CreateMode.PERSISTENT, 
+                        this,
+                        clop);
+            } else {
+                bk.getZooKeeper().setData(clop.getClosePath(), 
+                        clop.getLast().array(), -1, this, clop);
+            }
+            break;
+        case 2:   
+            LedgerHandle lh = clop.getLh(); 
+            try{
+                lh.closeUp();
+                StopOp sOp = new StopOp();
+                lh.getQuorumEngine().sendOp(sOp);
+
+            } catch(Exception e) {
+                LOG.warn("Exception while stopping quorum engine: " + lh.getId());
+            }
+            clop.getCb().closeComplete(BKDefs.EOK, clop.getLh(), clop.getCtx());
+        
+            break;
+        }    
+    }
+    
+    /**
+     * Implements org.apache.zookeeper.AsyncCallback.StatCallback 
+     */
+    public void processResult(int rc, String path, Object ctx, Stat stat){
+        LedgerOp op = (LedgerOp) ctx;
+       
+        if(rc != BKDefs.EOK){
+            op.setRC(rc);
+            while(true){
+                try{
+                    this.addOp(op);
+                    return;
+                } catch(InterruptedException e) {
+                    LOG.warn("Interrupted while trying to add operation to queue", e);
+                }
+            }
+        }
+        
+        switch(op.getType()){
+        case CREATE:
+            break;
+        case OPEN:
+            switch(op.getAction()){
+            case 0:
+                if(stat == null)
+                    op.setRC(BKDefs.ENL);
+                break;
+            case 2:
+                /*
+                 * If there is no "close" znode, then we have
+                 * to recover this ledger
+                 */
+                if(stat == null)
+                    op.setAction(3);
+                else
+                    op.setAction(4);
+                break;
+            }
+        case CLOSE:
+            CloseLedgerOp clop = (CloseLedgerOp) op;
+            clop.setStat(stat);
+            clop.setAction(1);
+            break;
+        }
+    
+        /*
+         * Queues operation for processing
+         */
+        int counter = 0;
+        boolean leave = false;
+        while(!leave){
+            try{
+                this.addOp(op);
+                leave = true;
+            } catch(InterruptedException e) {
+                if(counter++ > MAXATTEMPTS){
+                    LOG.error("Exceed maximum number of attempts");
+                    leave = true;
+                } else
+                    LOG.warn("Interrupted while trying to add operation to queue", e);
+            }
+        }
+    
+    }   
+    
+    /**
+     * Implements org.apache.zookeeper.AsyncCallback.StringCallback 
+     */
+    public void processResult(int rc, String path, Object ctx, String name){
+        LedgerOp op = (LedgerOp) ctx;
+        
+        if(rc != BKDefs.EOK){
+            op.setRC(rc);
+        } else switch(op.getType()){
+               case CREATE:
+                   CreateLedgerOp cop = (CreateLedgerOp) op;
+
+                   int counter = cop.zkOpCounter.decrementAndGet(); 
+                   if(op.getAction() == 0){
+                       cop.setAction(1);
+                       cop.setPath(name);
+                       op.setRC(rc);               
+                   } else {
+                       if(counter == 0){
+                           cop.setAction(3);
+                       } else {
+                           /*
+                            * Could queue a no-op, but for optimization 
+                            * purposes, let's return here
+                            */
+                           return;
+                       }
+
+                   }
+                   op = cop;
+                   break;
+               case OPEN:
+                   break;
+               case CLOSE:
+                   CloseLedgerOp clop = (CloseLedgerOp) op;
+                   clop.setAction(1);
+                   break;
+        }
+        
+        /*
+         * Queues operation for processing 
+         */
+        int counter = 0;
+        boolean leave = false;
+        while(!leave){
+            try{
+                this.addOp(op);
+                leave = true;
+            } catch(InterruptedException e) {
+                if(counter++ > MAXATTEMPTS){
+                    LOG.error("Exceed maximum number of attempts");
+                    leave = true;
+                } else
+                    LOG.warn("Interrupted while trying to add operation to queue", e);
+            }
+        }
+        LOG.info("Leaving loop");
+    }
+    
+    /**
+     * Implement org.apache.zookeeper.AsyncCallback.ChildrenCallback
+     */
+    public void processResult(int rc, String path, Object ctx, List<String> children){
+       LedgerOp op = (LedgerOp) ctx;
+       
+       LOG.info("Processing children callback");
+       if(rc != BKDefs.EOK){
+           op.setRC(rc);
+       } else switch(op.getType()){
+              case CREATE:
+                  CreateLedgerOp cop = (CreateLedgerOp) op;
+                  cop.addAvailable(children);
+                  int counter = cop.zkOpCounter.decrementAndGet();
+                  LOG.info("ZK Op counter value: " + counter);
+                  cop.setAction(2);
+                  
+                  op = cop;
+                  break;
+              case OPEN:
+                  OpenLedgerOp oop = (OpenLedgerOp) op;
+                  oop.addBookieIds(children);
+                  break;
+       }
+       
+       int counter = 0;
+       boolean leave = false;
+       while(!leave){
+           try{
+               this.addOp(op);
+               leave = true;
+           } catch(InterruptedException e) {
+               if(counter++ > MAXATTEMPTS){
+                   LOG.error("Exceed maximum number of attempts");
+                   leave = true;
+               } else
+                   LOG.warn("Interrupted while trying to add operation to queue", e);
+           }
+       }
+    }
+    
+    /**
+     * Implement org.apache.zookeeper.AsyncCallback.DataCallback
+     */
+    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat){
+        LedgerOp op = (LedgerOp) ctx;
+        ByteBuffer bb;
+        
+        if(rc != BKDefs.EOK){
+            op.setRC(rc);
+        } else switch(op.getType()){
+               case OPEN:
+                   OpenLedgerOp oop = (OpenLedgerOp) op;
+                   switch(oop.getAction()){
+                   case 1: 
+                       oop.setQSize(data);
+                       break;
+                   case 4:
+                       bb = ByteBuffer.wrap(data);
+                       oop.setLast(bb.getLong());
+                       break;
+                   case 5:
+                       bb = ByteBuffer.wrap(data);
+                       
+                       switch(bb.getInt()){
+                       case 1:
+                           oop.setQMode(QMode.GENERIC);
+                           LOG.info("Generic ledger");
+                           break;
+                       case 2:
+                           oop.setQMode(QMode.FREEFORM);
+                           break;
+                       default:
+                           oop.setQMode(QMode.VERIFIABLE);
+                       LOG.info("Verifiable ledger");
+                       }
+                   }
+                   break;
+               default:
+                   LOG.warn("Wrong type");
+                   break;  
+        }
+        
+        int counter = 0;
+        boolean leave = false;
+        while(!leave){
+            try{
+                this.addOp(op);
+                leave = true;
+            } catch(InterruptedException e) {
+                if(counter++ > MAXATTEMPTS){
+                    LOG.error("Exceed maximum number of attempts");
+                    leave = true;
+                } else
+                    LOG.warn("Interrupted while trying to add operation to queue", e);
+            }
+        }
+    }
+}

+ 2 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java

@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.ClientCBWorker;
 import org.apache.bookkeeper.client.QuorumOpMonitor;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.QuorumOpMonitor.PendingOp;
 import org.apache.bookkeeper.client.QuorumOpMonitor.PendingReadOp;
 import org.apache.bookkeeper.proto.ReadEntryCallback;

+ 1 - 2
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java

@@ -37,7 +37,6 @@ import javax.crypto.Mac;
 import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
-import org.apache.bookkeeper.client.ErrorCodes;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
@@ -183,7 +182,7 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
                         //Call back with error code
                         //sAdd.op.cb.addComplete(ErrorCodes.ENUMRETRIES,
                         //        ledgerId, entryId, sAdd.op.ctx);
-                        sAdd.op.setErrorCode(ErrorCodes.ENUMRETRIES);
+                        sAdd.op.setErrorCode(BKDefs.ENR);
                         sAdd.op.setReady();
                         return;
                     }

+ 0 - 38
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java

@@ -1,38 +0,0 @@
-package org.apache.bookkeeper.client;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-/**
- * Callback interface
- */
-
-public interface ReadCallback {
-	/**
-	 * Callback declaration
-	 * 
-	 * @param rc	return code
-	 * @param ledgerId	ledger identifier
-	 * @param seq	sequence of entries
-	 * @param ctx	control object
-	 */
-    void readComplete(int rc, long ledgerId, LedgerSequence seq, Object ctx);
-}

+ 2 - 3
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java

@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.client.AddCallback;
 import org.apache.bookkeeper.proto.NIOServerFactory.Cnxn;
 import org.apache.log4j.Logger;
 
@@ -37,7 +36,7 @@ import org.apache.log4j.Logger;
  * Implements the server-side part of the BookKeeper protocol.
  *
  */
-public class BookieServer implements NIOServerFactory.PacketProcessor, AddCallback {
+public class BookieServer implements NIOServerFactory.PacketProcessor, WriteCallback {
     int port;
     NIOServerFactory nioServerFactory;
     Bookie bookie;
@@ -178,7 +177,7 @@ public class BookieServer implements NIOServerFactory.PacketProcessor, AddCallba
         }
     }
     
-    public void addComplete(int rc, long ledgerId, long entryId, Object ctx) {
+    public void writeComplete(int rc, long ledgerId, long entryId, Object ctx) {
         Cnxn src = (Cnxn)ctx;
         ByteBuffer bb = ByteBuffer.allocate(24);
         bb.putInt(BookieProtocol.ADDENTRY);

+ 393 - 0
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java

@@ -0,0 +1,393 @@
+package org.apache.bookkeeper.test;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.streaming.LedgerInputStream;
+import org.apache.bookkeeper.streaming.LedgerOutputStream;
+import org.apache.bookkeeper.util.ClientBase;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Test;
+
+//import BookieReadWriteTest.SyncObj;
+//import BookieReadWriteTest.emptyWatcher;
+
+/**
+ * This test tests read and write, synchronous and 
+ * asynchronous, strings and integers for a BookKeeper client. 
+ * The test deployment uses a ZooKeeper server 
+ * and three BookKeepers. 
+ * 
+ */
+
+public class AsyncLedgerOpsTest 
+    extends junit.framework.TestCase 
+    implements AddCallback, 
+    ReadCallback, 
+    CreateCallback,
+    CloseCallback,
+    OpenCallback{
+    static Logger LOG = Logger.getLogger(BookieClientTest.class);
+
+    static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+
+    // ZooKeeper related variables
+    private static final String HOSTPORT = "127.0.0.1:2181";
+    static Integer ZooKeeperDefaultPort = 2181;
+    ZooKeeperServer zks;
+    ZooKeeper zkc; //zookeeper client
+    NIOServerCnxn.Factory serverFactory;
+    File ZkTmpDir;
+    
+    //BookKeeper 
+    File tmpDirB1, tmpDirB2, tmpDirB3;
+    BookieServer bs1, bs2, bs3;
+    Integer initialPort = 5000;
+    BookKeeper bkc; // bookkeeper client
+    byte[] ledgerPassword = "aaa".getBytes();
+    LedgerHandle lh, lh2;
+    long ledgerId;
+    LedgerSequence ls;
+    
+    //test related variables 
+    int numEntriesToWrite = 20;
+    int maxInt = 2147483647;
+    Random rng; // Random Number Generator 
+    ArrayList<byte[]> entries; // generated entries
+    ArrayList<Integer> entriesSize;
+    
+    // Synchronization
+    SyncObj sync;
+    Set<Object> syncObjs;
+    
+    class SyncObj {
+        int counter;
+        boolean value;      
+        public SyncObj() {
+            counter = 0;
+            value = false;
+        }       
+    }
+    
+    class ControlObj{
+        LedgerHandle lh;
+        
+        void setLh(LedgerHandle lh){
+            this.lh = lh;
+        }
+        
+        LedgerHandle getLh(){
+            return lh;
+        }
+    }
+    
+    @Test
+    public void testAsyncCreateClose() throws IOException{
+        try {
+            // Create a BookKeeper client and a ledger
+            bkc = new BookKeeper("127.0.0.1");
+           
+            ControlObj ctx = new ControlObj();
+            
+            synchronized(ctx){
+                bkc.asyncCreateLedger(3, 2, 
+                    QMode.VERIFIABLE, 
+                    ledgerPassword,
+                    this,
+                    ctx);
+                ctx.wait();
+            }
+            
+            
+            //bkc.initMessageDigest("SHA1");
+            LedgerHandle lh = ctx.getLh();
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            for(int i = 0; i < numEntriesToWrite; i++){
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+                
+                entries.add(entry.array());
+                entriesSize.add(entry.array().length);
+                lh.asyncAddEntry(entry.array(), this, sync);
+            }
+            
+            // wait for all entries to be acknowledged
+            synchronized (sync) {
+                if (sync.counter < numEntriesToWrite){
+                    LOG.debug("Entries counter = " + sync.counter);
+                    sync.wait();
+                }
+            }
+            
+            LOG.debug("*** WRITE COMPLETE ***");
+            // close ledger 
+            synchronized(ctx){
+                lh.asyncClose(this, ctx);
+                ctx.wait();
+            }
+            
+            //*** WRITING PART COMPLETE // READ PART BEGINS ***
+            
+            // open ledger
+            synchronized(ctx){
+                bkc.asyncOpenLedger(ledgerId, ledgerPassword, this, ctx);
+                ctx.wait();
+            }
+            lh = ctx.getLh();
+            
+            LOG.debug("Number of entries written: " + lh.getLast());
+            assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);       
+            
+            //read entries
+            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+            
+            synchronized (sync) {
+                while(sync.value == false){
+                    sync.wait();
+                }               
+            }
+            
+            assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+            
+            LOG.debug("*** READ COMPLETE ***");
+            
+            // at this point, LedgerSequence ls is filled with the returned values
+            int i = 0;
+            while(ls.hasMoreElements()){
+                ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+                Integer origEntry = origbb.getInt();
+                byte[] entry = ls.nextElement().getEntry();
+                ByteBuffer result = ByteBuffer.wrap(entry);
+                LOG.debug("Length of result: " + result.capacity());
+                LOG.debug("Original entry: " + origEntry);
+
+                Integer retrEntry = result.getInt();
+                LOG.debug("Retrieved entry: " + retrEntry);
+                assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+                assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+                i++;
+            }
+            lh.close();
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (BKException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } //catch (NoSuchAlgorithmException e) {
+        //  e.printStackTrace();
+        //}
+        
+    }
+    
+    
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+        SyncObj x = (SyncObj) ctx;
+        synchronized (x) {
+            x.counter++;
+            x.notify();
+        }
+    }
+
+    public void readComplete(int rc, LedgerHandle lh, LedgerSequence seq,
+            Object ctx) {
+        ls = seq;               
+        synchronized (sync) {
+            sync.value = true;
+            sync.notify();
+        }
+        
+    }
+    
+    public void createComplete(int rc, LedgerHandle lh, Object ctx){
+        synchronized(ctx){
+            ControlObj cobj = (ControlObj) ctx;
+            cobj.setLh(lh);
+            cobj.notify();
+        }   
+    }
+    
+    public void openComplete(int rc, LedgerHandle lh, Object ctx){
+        synchronized(ctx){
+            ControlObj cobj = (ControlObj) ctx;
+            cobj.setLh(lh);
+            cobj.notify();
+        }   
+    }
+    
+    public void closeComplete(int rc, LedgerHandle lh, Object ctx){
+        synchronized(ctx){
+            ControlObj cobj = (ControlObj) ctx;
+            cobj.notify();
+        }
+    }
+     
+    protected void setUp() throws IOException {
+        LOG.addAppender(ca);
+        LOG.setLevel((Level) Level.DEBUG);
+        
+        // create a ZooKeeper server(dataDir, dataLogDir, port)
+        LOG.debug("Running ZK server");
+        //ServerStats.registerAsConcrete();
+        ClientBase.setupTestEnv();
+        ZkTmpDir = File.createTempFile("zookeeper", "test");
+        ZkTmpDir.delete();
+        ZkTmpDir.mkdir();
+            
+        try {
+            zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+            serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+            serverFactory.startup(zks);
+        } catch (IOException e1) {
+            // TODO Auto-generated catch block
+            e1.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
+        
+        LOG.debug("Server up: " + b);
+        
+        // create a zookeeper client
+        LOG.debug("Instantiate ZK Client");
+        zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+        
+        //initialize the zk client with values
+        try {
+            zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        
+        // Create Bookie Servers (B1, B2, B3)
+        tmpDirB1 = File.createTempFile("bookie1", "test");
+        tmpDirB1.delete();
+        tmpDirB1.mkdir();
+         
+        bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
+        bs1.start();
+        
+        tmpDirB2 = File.createTempFile("bookie2", "test");
+        tmpDirB2.delete();
+        tmpDirB2.mkdir();
+            
+        bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
+        bs2.start();
+
+        tmpDirB3 = File.createTempFile("bookie3", "test");
+        tmpDirB3.delete();
+        tmpDirB3.mkdir();
+        
+        bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
+        bs3.start();
+        
+        rng = new Random(System.currentTimeMillis());   // Initialize the Random Number Generator 
+        entries = new ArrayList<byte[]>(); // initialize the  entries list
+        entriesSize = new ArrayList<Integer>(); 
+        sync = new SyncObj(); // initialize the synchronization data structure
+    }
+    
+    protected void tearDown(){
+        LOG.info("TearDown");
+
+        //shutdown bookie servers 
+        try {
+            bs1.shutdown();
+            bs2.shutdown();
+            bs3.shutdown();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        cleanUpDir(tmpDirB1);
+        cleanUpDir(tmpDirB2);
+        cleanUpDir(tmpDirB3);
+        
+        //shutdown ZK server
+        serverFactory.shutdown();
+        assertTrue("waiting for server down",
+                ClientBase.waitForServerDown(HOSTPORT,
+                                             ClientBase.CONNECTION_TIMEOUT));
+        //ServerStats.unregister();
+        cleanUpDir(ZkTmpDir);
+        
+    }
+
+    /*  Clean up a directory recursively */
+    protected boolean cleanUpDir(File dir){
+        if (dir.isDirectory()) {
+            LOG.info("Cleaning up " + dir.getName());
+            String[] children = dir.list();
+            for (String string : children) {
+                boolean success = cleanUpDir(new File(dir, string));
+                if (!success) return false;
+            }
+        }
+        // The directory is now empty so delete it
+        return dir.delete();        
+    }
+
+    /*  User for testing purposes, void */
+    class emptyWatcher implements Watcher{
+        public void process(WatchedEvent event) {}
+    }
+}

+ 9 - 4
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java

@@ -29,12 +29,12 @@ import java.util.ArrayList;
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.bookkeeper.client.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerSequence;
-import org.apache.bookkeeper.client.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.streaming.LedgerInputStream;
 import org.apache.bookkeeper.streaming.LedgerOutputStream;
@@ -475,7 +475,10 @@ public class BookieReadWriteTest
     }
     
     
-	public void addComplete(int rc, long ledgerId, long entryId, Object ctx) {
+	public void addComplete(int rc, 
+	        LedgerHandle lh, 
+	        long entryId, 
+	        Object ctx) {
 		SyncObj x = (SyncObj) ctx;
 		synchronized (x) {
 			x.counter++;
@@ -483,7 +486,9 @@ public class BookieReadWriteTest
 		}
 	}
 
-	public void readComplete(int rc, long ledgerId, LedgerSequence seq,
+	public void readComplete(int rc, 
+	        LedgerHandle lh, 
+	        LedgerSequence seq,
 			Object ctx) {
 		ls = seq;				
 		synchronized (sync) {