Jelajahi Sumber

ZOOKEEPER-356. Masking bookie failure during writes to a ledger

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@787907 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 16 tahun lalu
induk
melakukan
e031674368
21 mengubah file dengan 1787 tambahan dan 722 penghapusan
  1. 2 0
      CHANGES.txt
  2. 1 1
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
  3. 0 1
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
  4. 38 0
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java
  5. 23 1
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
  6. 134 71
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
  7. 165 40
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
  8. 12 12
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
  9. 218 25
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
  10. 142 14
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java
  11. 5 4
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
  12. 49 16
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
  13. 46 34
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
  14. 120 56
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
  15. 5 0
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
  16. 5 0
      src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
  17. 10 8
      src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
  18. 385 0
      src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java
  19. 408 404
      src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
  20. 10 16
      src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
  21. 9 19
      src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java

+ 2 - 0
CHANGES.txt

@@ -232,6 +232,8 @@ NIOServerCnxn. (phunt via mahadev)
   ZOOKEEPER-329. document how to integrate 3rd party authentication into ZK
 server ACLs. (breed via mahadev)
 
+  ZOOKEEPER-356. Masking bookie failure during writes to a ledger (flavio via breed)
+
 NEW FEATURES:
 
   ZOOKEEPER-371. jdiff documentation included in build/release (giri via phunt)

+ 1 - 1
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java

@@ -44,7 +44,7 @@ import org.apache.log4j.Logger;
 
 public class Bookie extends Thread {
     HashMap<Long, LedgerDescriptor> ledgers = new HashMap<Long, LedgerDescriptor>();
-    Logger LOG = Logger.getLogger(Bookie.class);
+    static Logger LOG = Logger.getLogger(Bookie.class);
     /**
      * 4 byte signature followed by 2-byte major and 2-byte minor versions
      */

+ 0 - 1
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java

@@ -78,5 +78,4 @@ public interface AsyncCallback {
          */
         void readComplete(int rc, LedgerHandle lh, LedgerSequence seq, Object ctx);
     }
-    
 }

+ 38 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java

@@ -26,12 +26,50 @@ public interface BKDefs {
      * String used to construct znode paths. They are used in BookKeeper
      *  and LedgerManagementProcessor.
      */
+    
+    /*
+     * Path to ledger metadata. ZooKeeper appends a sequence number to L.
+     */
     static public final String prefix = "/ledgers/L";
+    
+    /*
+     * Parent node to store ensemble composition. Each child corresponds to
+     * one bookie.
+     */
     static public final String ensemble = "/ensemble"; 
+    
+    /*
+     * Quorum size.
+     */
     static public final String quorumSize = "/quorum";
+    
+    /*
+     * Close node.
+     */
     static public final String close = "/close";
+    
+    /*
+     * Quorum mode: VERIFYING or GENERIC
+     */
     static public final String quorumMode = "/mode";
     
+    /*
+     * Marks failure points in during writes to the ledger.
+     */
+    static public final String quorumEvolution = "/quorum_evolution";
+    
+    /*
+     * Ledger is in write mode
+     */
+    
+    static public final int WRITE = 0;
+
+    /*
+     * Ledger is in read mode
+     */
+
+    static public final int READ = 1;
+    
     /**
      * Status ok
      */

+ 23 - 1
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java

@@ -48,8 +48,12 @@ public abstract class BKException extends Exception {
             return new BKDigestNotInitializedException();
         case Code.DigestMatchException:
             return new BKDigestMatchException();
+        case Code.NotEnoughBookiesException:
+            return new BKNotEnoughBookiesException();
         case Code.NoSuchLedgerExistsException:
             return new BKNoSuchLedgerExistsException();
+        case Code.BookieHandleNotAvailableException:
+            return new BKBookieHandleNotAvailableException();
         default:
             return new BKIllegalOpException();
         }
@@ -62,7 +66,9 @@ public abstract class BKException extends Exception {
         int NoBookieAvailableException = -3;
         int DigestNotInitializedException = -4;
         int DigestMatchException = -5;
-        int NoSuchLedgerExistsException = -6;
+        int NotEnoughBookiesException = -6;
+        int NoSuchLedgerExistsException = -7;
+        int BookieHandleNotAvailableException = -8;
         
         int IllegalOpException = -100;
     }
@@ -89,8 +95,12 @@ public abstract class BKException extends Exception {
             return "Digest engine not initialized";
         case Code.DigestMatchException:
             return "Entry digest does not match";
+        case Code.NotEnoughBookiesException:
+            return "Not enough non-faulty bookies available";
         case Code.NoSuchLedgerExistsException:
             return "No such ledger exists";
+        case Code.BookieHandleNotAvailableException:
+            return "Bookie handle is not available";
         default:
             return "Invalid operation";
         }
@@ -132,10 +142,22 @@ public abstract class BKException extends Exception {
         }   
     }
     
+    public static class BKNotEnoughBookiesException extends BKException {
+        public BKNotEnoughBookiesException(){
+            super(Code.NotEnoughBookiesException);
+        }
+    }
+
     public static class BKNoSuchLedgerExistsException extends BKException {
         public BKNoSuchLedgerExistsException(){
             super(Code.NoSuchLedgerExistsException);
         }   
     }
+    
+    public static class BKBookieHandleNotAvailableException extends BKException {
+        public BKBookieHandleNotAvailableException(){
+            super(Code.BookieHandleNotAvailableException);
+        }   
+    }
 }
     

+ 134 - 71
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java

@@ -24,6 +24,7 @@ package org. apache.bookkeeper.client;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -58,6 +59,8 @@ import org.apache.zookeeper.WatchedEvent;
  * There are three possible operations: start a new ledger, 
  * write to a ledger, and read from a ledger.
  * 
+ * For the ZooKeeper layout, please refer to BKDefs.java.
+ * 
  */
 
 public class BookKeeper 
@@ -142,6 +145,8 @@ implements Watcher {
         IOException, BKException {
         // Check that quorum size follows the minimum
         long t;
+        LedgerHandle lh = null;
+        
         switch(mode){
         case VERIFIABLE:
             t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/2));
@@ -171,71 +176,77 @@ implements Watcher {
          */
         String parts[] = path.split("/");
         String subparts[] = parts[2].split("L");
-        long lId = Long.parseLong(subparts[1]);
-        /* 
-         * Get children from "/ledgers/available" on zk
-         */
-        List<String> list = 
-            zk.getChildren("/ledgers/available", false);
-        ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
-        /* 
-         * Select ensSize servers to form the ensemble
-         */
-        path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0], 
-                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        /* 
-         * Add quorum size to ZK metadata
-         */
-        ByteBuffer bb = ByteBuffer.allocate(4);
-        bb.putInt(qSize);
-        zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(), 
-                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        /* 
-         * Quorum mode
-         */
-        bb = ByteBuffer.allocate(4);
-        bb.putInt(mode.ordinal());
-        zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(), 
-                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        /* 
-         * Create QuorumEngine
-         */
-        LedgerHandle lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
-        //qeMap.put(lId, queue);
-        /*
-         * Adding bookies to ledger handle
-         */
-        Random r = new Random();
+        try{
+            long lId = Long.parseLong(subparts[1]);
+       
+            /* 
+             * Get children from "/ledgers/available" on zk
+             */
+            List<String> list = 
+                zk.getChildren("/ledgers/available", false);
+            ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
+            /* 
+             * Select ensSize servers to form the ensemble
+             */
+            path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0], 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+         
+            /* 
+             * Add quorum size to ZK metadata
+             */
+            ByteBuffer bb = ByteBuffer.allocate(4);
+            bb.putInt(qSize);
+            zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(), 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            /* 
+             * Quorum mode
+             */
+            bb = ByteBuffer.allocate(4);
+            bb.putInt(mode.ordinal());
+            zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(), 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            /* 
+             * Create QuorumEngine
+             */
+            lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
+            
+            /*
+             * Adding bookies to ledger handle
+             */
+            Random r = new Random();
         
-        for(int i = 0; i < ensSize; i++){
-        	int index = 0;
-        	if(list.size() > 1) 
-        		index = r.nextInt(list.size() - 1);
-        	else if(list.size() == 1)
-        	    index = 0;
-        	else {
-        	    LOG.error("Not enough bookies available");
+            for(int i = 0; i < ensSize; i++){
+                int index = 0;
+                if(list.size() > 1) 
+                    index = r.nextInt(list.size() - 1);
+                else if(list.size() == 1)
+                    index = 0;
+                else {
+                    LOG.error("Not enough bookies available");
         	    
-        	    return null;
-        	}
+                    return null;
+                }
             
-        	try{
-        	    String bookie = list.remove(index);
-        	    LOG.info("Bookie: " + bookie);
-        	    InetSocketAddress tAddr = parseAddr(bookie);
-        	    int bindex = lh.addBookie(tAddr); 
-        	    ByteBuffer bindexBuf = ByteBuffer.allocate(4);
-        	    bindexBuf.putInt(bindex);
+                try{
+                    String bookie = list.remove(index);
+                    LOG.info("Bookie: " + bookie);
+                    InetSocketAddress tAddr = parseAddr(bookie);
+                    int bindex = lh.addBookieForWriting(tAddr); 
+                    ByteBuffer bindexBuf = ByteBuffer.allocate(4);
+                    bindexBuf.putInt(bindex);
         	    
-        	    String pBookie = "/" + bookie;
-        	    zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(), 
-        	            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        	} catch (IOException e) {
-        	    LOG.error(e);
-        	    i--;
-        	} 
+                    String pBookie = "/" + bookie;
+                    zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(), 
+                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                } catch (IOException e) {
+                    LOG.error(e);
+                    i--;
+                } 
+            }
+            LOG.debug("Created new ledger");
+        } catch (NumberFormatException e) {
+            LOG.error("Error when parsing the ledger identifier", e);
         }
-        LOG.debug("Created new ledger");
         // Return ledger handler
         return lh; 
     }
@@ -333,7 +344,6 @@ implements Watcher {
          */
         data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
         buf = ByteBuffer.wrap(data);
-        //int ordinal = buf.getInt();
         
         QMode qMode;
         switch(buf.getInt()){
@@ -361,22 +371,62 @@ implements Watcher {
         List<String> list = 
             zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
         
-        LOG.info("Length of list of bookies: " + list.size());
+        LOG.debug("Length of list of bookies: " + list.size());
         for(int i = 0 ; i < list.size() ; i++){
             for(String s : list){
+                LOG.debug("Extracting bookie: " + s);
                 byte[] bindex = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + "/" + s, false, stat);
                 ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
                 if(bindexBuf.getInt() == i){                      
                     try{
-                        lh.addBookie(parseAddr(s));
+                        lh.addBookieForReading(parseAddr(s));
                     } catch (IOException e){
                         LOG.error(e);
                     }
                 }
             }
         }
+        
+        /*
+         * Read changes to quorum over time. To determine if there has been changes during
+         * writes to the ledger, check if there is a znode called quorumEvolution.
+         */
+        if(zk.exists(BKDefs.prefix + 
+                getZKStringId(lh.getId()) +  
+                BKDefs.quorumEvolution, false) != null){
+                    String path = BKDefs.prefix + 
+                    getZKStringId(lh.getId()) +  
+                    BKDefs.quorumEvolution;
+                    
+                    List<String> faultList = zk.getChildren(path, false);
+                    try{
+                        for(String s : faultList){
+                            LOG.debug("Faulty list child: " + s);
+                            long entry = Long.parseLong(s);
+                            String addresses = new String(zk.getData(path + "/" + s, false, stat));
+                            String parts[] = addresses.split(" ");
+
+                            ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
+                            for(int i = 0 ; i < parts.length ; i++){
+                                LOG.debug("Address: " + parts[i]);
+                                InetSocketAddress faultyBookie =  
+                                    parseAddr(parts[i].substring(1));                           
+                        
+                                newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
+                            }
+                            lh.setNewBookieConfig(entry, newBookieSet);
+                            LOG.debug("NewBookieSet size: " + newBookieSet.size());
+                        }
+
+                        lh.prepareEntryChange();
+                    } catch (NumberFormatException e) {
+                        LOG.error("Error when parsing the ledger identifier", e);
+                    }
+                }
       
-        // Return ledger handler
+        /*
+         *  Return ledger handler
+         */
         return lh;
     }    
     
@@ -518,12 +568,14 @@ implements Watcher {
      *  @param	a	InetSocketAddress
      */
     
-    synchronized BookieHandle getBookieHandle(InetSocketAddress a)
+    synchronized BookieHandle getBookieHandle(LedgerHandle lh, InetSocketAddress a)
     throws ConnectException, IOException {
     	if(!bhMap.containsKey(a)){
-    		bhMap.put(a, new BookieHandle(a));
+    	    BookieHandle bh = new BookieHandle(a, true); 
+    		bhMap.put(a, bh);
+    		bh.start();
     	}
-    	bhMap.get(a).incRefCount();
+    	bhMap.get(a).incRefCount(lh);
     	
     	return bhMap.get(a);
     }
@@ -533,9 +585,10 @@ implements Watcher {
      * remove it from the list. 
      */
     
-    synchronized void haltBookieHandles(ArrayList<BookieHandle> bookies){
-        for(BookieHandle bh : bookies){
-            if(bh.halt() <= 0)
+    synchronized void haltBookieHandles(LedgerHandle lh, ArrayList<BookieHandle> bookies){
+        while(bookies.size() > 0){
+            BookieHandle bh = bookies.remove(0);
+            if(bh.halt(lh) <= 0)
                 bhMap.remove(bh.addr);
         }
     }
@@ -549,5 +602,15 @@ implements Watcher {
         bookieBlackList.add(addr);
     }
     
-   
+    /**
+     * Halts all bookie handles
+     * 
+     */
+    public void halt() throws InterruptedException{
+        
+        for(BookieHandle bh: bhMap.values()){
+            bh.shutdown();
+        }
+        zk.close();
+    }
 }

+ 165 - 40
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java

@@ -24,6 +24,8 @@ package org.apache.bookkeeper.client;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.security.NoSuchAlgorithmException;
@@ -31,12 +33,15 @@ import java.security.InvalidKeyException;
 import javax.crypto.Mac; 
 import javax.crypto.spec.SecretKeySpec;
 
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.LedgerHandle.QMode;
 import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubStopOp;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.log4j.Logger;
 
@@ -47,15 +52,17 @@ import org.apache.log4j.Logger;
  * 
  */
 
-class BookieHandle extends Thread{
-    Logger LOG = Logger.getLogger(BookieClient.class);
+public class BookieHandle extends Thread {
+    static Logger LOG = Logger.getLogger(BookieClient.class);
     
-    boolean stop = false;
+    volatile boolean stop = false;
+    boolean noreception = false;
     private BookieClient client;
     InetSocketAddress addr;
     static int recvTimeout = 2000;
     private ArrayBlockingQueue<ToSend> incomingQueue;
     private int refCount = 0;
+    HashSet<LedgerHandle> ledgers;
     
     /**
      * Objects of this class are queued waiting to be
@@ -79,13 +86,17 @@ class BookieHandle extends Thread{
      * @param addr	address of the bookkeeper server that this
      * handle should connect to.
      */
-    BookieHandle(InetSocketAddress addr) throws IOException {
-        this.client = new BookieClient(addr, recvTimeout);
+    BookieHandle(InetSocketAddress addr, boolean enabled) throws IOException {
+        this.stop = !enabled;
+        this.noreception = !enabled;
+        if(!stop)
+            this.client = new BookieClient(addr, recvTimeout);
+        else
+            this.client = null;
+        
         this.addr = addr;
         this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000);
-        
-        //genSecurePadding();
-        start();
+        this.ledgers = new HashSet<LedgerHandle>();
     }
     
     
@@ -100,22 +111,39 @@ class BookieHandle extends Thread{
     }
 
     /**
-     * Sending add operation to bookie
+     * Sending add operation to bookie. We have to synchronize the send to guarantee
+     * that requests will either get a response or throw an exception. 
      * 
      * @param r
      * @param cb
      * @param ctx
      * @throws IOException
      */
-    public void sendAdd(LedgerHandle lh, SubAddOp r, long entry)
-    throws IOException {
+    public synchronized void sendAdd(LedgerHandle lh, SubAddOp r, long entry)
+    throws IOException, BKException {
         try{
-            incomingQueue.put(new ToSend(lh, r, entry));
+            if(!noreception){
+                ToSend ts = new ToSend(lh, r, entry);
+                if(!incomingQueue.offer(ts, 1000, TimeUnit.MILLISECONDS))
+                    throw BKException.create(Code.BookieHandleNotAvailableException);
+            } else {
+                throw BKException.create(Code.BookieHandleNotAvailableException);
+            }
         } catch(InterruptedException e){
             LOG.warn("Interrupted while waiting for room in the incoming queue");
         }
     }
     
+    private synchronized void sendStop(){
+        try{
+            noreception = true;
+            LOG.debug("Sending stop signal");
+            incomingQueue.put(new ToSend(null, new SubStopOp(new StopOp()), -1));
+            LOG.debug("Sent stop signal");
+        } catch(InterruptedException e) {
+            LOG.fatal("Interrupted while sending stop signal to bookie handle");
+        }       
+    }
     /**
      * MAC instance
      * 
@@ -142,29 +170,41 @@ class BookieHandle extends Thread{
      * @throws IOException
      */
     
-    public void sendRead(LedgerHandle lh, SubReadOp r, long entry)
-    throws IOException {
+    public synchronized void sendRead(LedgerHandle lh, SubReadOp r, long entry)
+    throws IOException, BKException {
         try{
-            incomingQueue.put(new ToSend(lh, r, entry));
+            if(!noreception){           
+                ToSend ts = new ToSend(lh, r, entry);
+                if(!incomingQueue.offer(ts, 1000, TimeUnit.MILLISECONDS))
+                    throw BKException.create(Code.BookieHandleNotAvailableException);
+            } else {
+                throw BKException.create(Code.BookieHandleNotAvailableException);
+            }
         } catch(InterruptedException e){
             LOG.warn("Interrupted while waiting for room in the incoming queue");
         }
     }
     
     public void run(){
-        while(!stop){
-            try{
-                ToSend ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
+        ToSend ts;
+        
+        try{
+            while(!stop){
+                ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
+                    
                 if(ts != null){
                 	LedgerHandle self = ts.lh;
                     switch(ts.type){
+                    case Operation.STOP:
+                        LOG.info("Stopping BookieHandle: " + addr);
+                        client.errorOut();                   
+                        cleanQueue();
+                        LOG.debug("Stopped");
+                        break;
                     case Operation.ADD:
                         SubAddOp aOp = (SubAddOp) ts.ctx;
                         AddOp op = ((AddOp) aOp.op);
                         
-                        /*
-                         * TODO: Really add the confirmed add to the op
-                         */
                         long confirmed = self.getAddConfirmed();
                         ByteBuffer extendedData;
     
@@ -179,7 +219,6 @@ class BookieHandle extends Thread{
                             extendedData.rewind();
                             byte[] toProcess = new byte[op.data.length + 24];
                             extendedData.get(toProcess, 0, op.data.length + 24);
-                            //extendedData.limit(extendedData.capacity() - 20);
                             extendedData.position(extendedData.capacity() - 20);
                             if(mac == null)
                                 getMac(self.getMacKey(), "HmacSHA1");
@@ -200,47 +239,133 @@ class BookieHandle extends Thread{
                                 ts.ctx);
                         break;
                     case Operation.READ:
-                        client.readEntry(self.getId(),
-                            ts.entry,
-                            ((SubReadOp) ts.ctx).rcb,
-                            ts.ctx);
+                        if(client != null)
+                            client.readEntry(self.getId(),
+                                    ts.entry,
+                                    ((SubReadOp) ts.ctx).rcb,
+                                    ts.ctx);
+                        else ((SubReadOp) ts.ctx).rcb.readEntryComplete(-1, ts.lh.getId(), ts.entry, null, ts.ctx);
                         break;
                     }
-                }
-            } catch (InterruptedException e){
-                LOG.error(e);
-            } catch (IOException e){
-                LOG.error(e);
-            } catch (NoSuchAlgorithmException e){
-                LOG.error(e);
-            } catch (InvalidKeyException e) {
-                LOG.error(e);
+                } else LOG.warn("Empty queue: " + addr);
             }
-        }
+        } catch (Exception e){
+            LOG.error("Handling exception before halting BookieHandle", e);
+            for(LedgerHandle lh : ledgers)
+                lh.removeBookie(this);
+            
+            /*
+             * We only need to synchronize when setting noreception to avoid that
+             * a client thread add another request to the incomingQueue after we
+             * have cleaned it.
+             */
+            synchronized(this){
+                noreception = true;
+            }
+            client.halt();
+            client.errorOut();
+            cleanQueue();
+        } 
+        
+        LOG.info("Exiting bookie handle thread: " + addr);
     }
+        
     
     /**
      * Multiple ledgers may use the same BookieHandle object, so we keep
      * a count on the number of references.
      */
-    int incRefCount(){
+    int incRefCount(LedgerHandle lh){
+        ledgers.add(lh);
         return ++refCount;
     }
     
     /**
      * Halts if there is no ledger using this object.
+     *
+     * @return  int reference counter
      */
-    int halt(){
+    synchronized int halt(LedgerHandle lh){
+        LOG.info("Calling halt");
+        ledgers.remove(lh);
         int currentCount = --refCount;
         if(currentCount <= 0){
-            stop = true;
+            shutdown();
         }
         
         if(currentCount < 0)
             LOG.warn("Miscalculated the number of reference counts: " + addr);
-        
+
         return currentCount;
     }
+    
+    /**
+     * Halt this bookie handle independent of the number of ledgers using it. Called upon a 
+     * failure to write. This method cannot be called by this thread because it may cause a
+     * deadlock as shutdown invokes sendStop. The deadlock comes from sendAdd blocking on
+     * incomingQueue when the queue is full and the thread also blocking on it when
+     * trying to send the stop marker. Because this thread is actually the consumer, if it
+     * does not make progress, then we have a deadlock. 
+     * 
+     * @return int  reference counter
+     */
+    synchronized public int halt(){
+        if(!stop){
+            LOG.info("Calling halt");
+            for(LedgerHandle lh : ledgers)
+                lh.removeBookie(this);
+            refCount = 0;
+            shutdown();
+        }
+     
+        return refCount;
+    }
+    
+    /**
+     * Stop this bookie handle completely.
+     * 
+     */
+    public void shutdown(){
+        if(!stop){
+            LOG.info("Calling shutdown");
+            LOG.debug("Halting client");
+            client.halt();
+            LOG.debug("Cleaning queue");
+            sendStop();
+            LOG.debug("Finished shutdown"); 
+        }
+    }
+    
+    /**
+     * Invokes the callback method for pending requests in the queue
+     * of this BookieHandle.
+     */
+    private void cleanQueue(){
+        stop = true;
+        ToSend ts = incomingQueue.poll();
+        while(ts != null){
+            switch(ts.type){
+            case Operation.ADD:
+                SubAddOp aOp = (SubAddOp) ts.ctx;
+                aOp.wcb.writeComplete(-1, ts.lh.getId(), ts.entry, ts.ctx);
+     
+                break;
+            case Operation.READ:                
+                ((SubReadOp) ts.ctx).rcb.readEntryComplete(-1, ts.lh.getId(), ts.entry, null, ts.ctx);
+                break;
+            }
+            ts = incomingQueue.poll();
+        }
+    }
+                
+    /**
+     * Returns the negated value of stop, which gives the status of the
+     * BookieHandle.
+     */
+    
+    boolean isEnabled(){
+        return !stop;
+    }
 }
 
     

+ 12 - 12
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java

@@ -38,17 +38,17 @@ import org.apache.log4j.Logger;
  */
 
 class ClientCBWorker extends Thread{
-    Logger LOG = Logger.getLogger(ClientCBWorker.class);
+    static Logger LOG = Logger.getLogger(ClientCBWorker.class);
     static ClientCBWorker instance = null;
     
-    private boolean stop = false;
+    private volatile boolean stop;
     private static int instanceCounter= 0;
     
     ArrayBlockingQueue<Operation> pendingOps;
     QuorumOpMonitor monitor;
     
     
-    static synchronized ClientCBWorker getInstance(){
+    static ClientCBWorker getInstance(){
         if(instance == null){
             instance = new ClientCBWorker();
         }
@@ -63,9 +63,10 @@ class ClientCBWorker extends Thread{
      * 
      */
     ClientCBWorker(){
-       pendingOps = new ArrayBlockingQueue<Operation>(4000);  
+       pendingOps = new ArrayBlockingQueue<Operation>(6000);  
+       stop = false;
        start();
-       LOG.debug("Have started cbWorker");
+       LOG.info("Have started cbWorker");
     }
     
     
@@ -84,11 +85,11 @@ class ClientCBWorker extends Thread{
      * Gets thread out of its main loop.
      * 
      */
-    synchronized void shutdown(){
+    void shutdown(){
         if((--instanceCounter) == 0){
             stop = true;
             instance = null;
-            LOG.debug("Shutting down");
+            LOG.info("Shutting down CBWorker");
         }
     }
     
@@ -105,14 +106,14 @@ class ClientCBWorker extends Thread{
                 if(op != null){
                     synchronized(op){
                         while(!op.isReady()){
-                            op.wait();
+                            op.wait(1000);
                         }
                     }
-
+                    
                     switch(op.type){
                     case Operation.ADD:
                         AddOp aOp = (AddOp) op;
-                    
+                       
                         aOp.getLedger().setAddConfirmed(aOp.entry);
                         aOp.cb.addComplete(aOp.getErrorCode(),
                                 aOp.getLedger(),
@@ -122,14 +123,13 @@ class ClientCBWorker extends Thread{
                         break;
                     case Operation.READ:
                         ReadOp rOp = (ReadOp) op;
-                        //LOG.debug("Got one message from the queue: " + rOp.firstEntry);
                         rOp.cb.readComplete(rOp.getErrorCode(), 
                                 rOp.getLedger(),
                                 new LedgerSequence(rOp.seq), 
                                 rOp.ctx);
                         break;
                     }
-                }
+                } 
             }
         } catch (InterruptedException e){
            LOG.error("Exception while waiting on queue or operation"); 

+ 218 - 25
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java

@@ -28,7 +28,11 @@ import java.nio.ByteBuffer;
 import java.security.NoSuchAlgorithmException;
 import java.security.MessageDigest;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.TreeMap;
 
+import org.apache.bookkeeper.client.BKDefs;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -56,7 +60,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
      * ledgerhandle->write->bookeeper->quorumengine->bookiehandle
      * ->bookieclient
      */
-    Logger LOG = Logger.getLogger(LedgerHandle.class);
+   static Logger LOG = Logger.getLogger(LedgerHandle.class);
     
     public enum QMode {VERIFIABLE, GENERIC, FREEFORM};
     
@@ -64,12 +68,16 @@ public class LedgerHandle implements ReadCallback, AddCallback {
     private long ledger;
     private volatile long last;
     private volatile long lastAddConfirmed = 0;
-    private ArrayList<BookieHandle> bookies;
+    private HashMap<Integer, Long> lastRecvCorrectly;
+    private volatile ArrayList<BookieHandle> bookies;
     private ArrayList<InetSocketAddress> bookieAddrList;
+    private TreeMap<Long, ArrayList<BookieHandle> > bookieConfigMap;
+    private long[] entryChange;
     private BookKeeper bk;
     private QuorumEngine qe;
     private int qSize;
     private QMode qMode = QMode.VERIFIABLE;
+    private int lMode;
 
     private int threshold;
     private String digestAlg = "SHA1";
@@ -94,6 +102,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         this.ledger = ledger;
         this.last = last;
         this.bookies = new ArrayList<BookieHandle>();
+        this.lastRecvCorrectly = new HashMap<Integer, Long>();
         this.passwd = passwd;
         genLedgerKey(passwd);
         genMacKey(passwd);
@@ -122,6 +131,8 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         this.ledger = ledger;
         this.last = last;
         this.bookies = new ArrayList<BookieHandle>();
+        this.lastRecvCorrectly = new HashMap<Integer, Long>();
+
 
         this.qSize = qSize;
         this.qMode = mode;
@@ -150,6 +161,8 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         this.ledger = ledger;
         this.last = last;
         this.bookies = new ArrayList<BookieHandle>();
+        this.lastRecvCorrectly = new HashMap<Integer, Long>();
+
 
         this.qSize = qSize;
         this.passwd = passwd;
@@ -165,7 +178,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
     			LOG.debug("Opening bookieHandle: " + a);
             
     			//BookieHandle bh = new BookieHandle(this, a);
-    			this.bookies.add(bk.getBookieHandle(a));
+    			this.bookies.add(bk.getBookieHandle(this, a));
     		}
     	} catch(ConnectException e){
     		LOG.error(e);
@@ -198,15 +211,37 @@ public class LedgerHandle implements ReadCallback, AddCallback {
      * 
      * @param addr	socket address
      */
-    int addBookie(InetSocketAddress addr)
+    int addBookieForWriting(InetSocketAddress addr)
     throws IOException {
         LOG.debug("Bookie address: " + addr);
+        lMode = BKDefs.WRITE;
         //BookieHandle bh = new BookieHandle(this, addr);
-        this.bookies.add(bk.getBookieHandle(addr));
+        this.bookies.add(bk.getBookieHandle(this, addr));
         if(bookies.size() > qSize) setThreshold();
         return (this.bookies.size() - 1);
     }
     
+    /**
+     * Create bookie handle and add it to the list
+     * 
+     * @param addr  socket address
+     */
+    int addBookieForReading(InetSocketAddress addr)
+    throws IOException {
+        LOG.debug("Bookie address: " + addr);
+        lMode = BKDefs.READ;
+        //BookieHandle bh = new BookieHandle(this, addr);
+        try{
+            this.bookies.add(bk.getBookieHandle(this, addr));
+        } catch (IOException e){
+            LOG.info("Inserting a decoy bookie handle");
+            this.bookies.add(new BookieHandle(addr, false));
+        }
+        if(bookies.size() > qSize) setThreshold();
+        return (this.bookies.size() - 1);
+    }
+
+    
     private void setThreshold() {
         switch(qMode){
         case GENERIC:
@@ -225,6 +260,47 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         return threshold;
     }
     
+    
+    /**
+     * Writes to BookKeeper changes to the ensemble.
+     *         
+     * @param addr  Address of faulty bookie
+     * @param entry Last entry written before change of ensemble.
+     */
+    
+    void changeEnsemble(long entry){
+        String path = BKDefs.prefix + 
+        bk.getZKStringId(getId()) +  
+        BKDefs.quorumEvolution + "/" + 
+        String.format("%010d", entry);
+        
+        LOG.info("Report failure: " + String.format("%010d", entry));
+        try{
+            if(bk.getZooKeeper().exists(BKDefs.prefix + 
+                    bk.getZKStringId(getId()) +  
+                    BKDefs.quorumEvolution, false) == null)
+                bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(getId()) + 
+                        BKDefs.quorumEvolution, new byte[0], Ids.OPEN_ACL_UNSAFE, 
+                        CreateMode.PERSISTENT);
+        
+            boolean first = true;
+            String addresses = "";
+            for(BookieHandle bh : bookies){
+                if(first){ 
+                    addresses = bh.addr.toString();
+                    first = false;
+                }
+                else 
+                    addresses = addresses + " " + bh.addr.toString();
+            }
+            
+            bk.getZooKeeper() .create(path, addresses.getBytes(),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch(Exception e){
+            LOG.error("Could not write to ZooKeeper: " + path + ", " + e);
+        }
+    }
+    
     /**
      * Replace bookie in the case of a failure 
      */
@@ -250,7 +326,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
                 /*
                  * If successful in writing to new bookie, add it to the set
                  */
-                this.bookies.set(index, bk.getBookieHandle(addr));
+                this.bookies.set(index, bk.getBookieHandle(this, addr));
             } catch(ConnectException e){
                 bk.blackListBookie(addr);
                 LOG.error(e);
@@ -266,16 +342,23 @@ public class LedgerHandle implements ReadCallback, AddCallback {
      * to replace the current faulty one. In such cases,
      * we simply remove the bookie.
      * 
-     * @param index
-     */
-    void removeBookie(int index){
-        bookies.remove(index);
-    }
-    
-    void closeUp(){
-        ledger = -1;
-        last = -1;
-        bk.haltBookieHandles(bookies);
+     * 
+     * @param BookieHandle
+     */
+    synchronized void removeBookie(BookieHandle bh){
+       if(lMode == BKDefs.WRITE){
+           LOG.info("Removing bookie: " + bh.addr);
+           int index = bookies.indexOf(bh);
+           if(index >= 0){
+               Long tmpLastRecv = lastRecvCorrectly.get(index);
+               bookies.remove(index);
+        
+               if(tmpLastRecv == null)
+                   changeEnsemble(0);
+               else
+                   changeEnsemble(tmpLastRecv);
+           }
+       }
     }
     
     
@@ -328,6 +411,11 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         return lastAddConfirmed;
     }
     
+    void setLastRecvCorrectly(int sId, long entry){
+        //LOG.info("Setting last received correctly: " + entry);
+        lastRecvCorrectly.put(sId, entry);
+    }
+    
     /**
      * Returns the list of bookies
      * @return ArrayList<BookieHandle>
@@ -336,6 +424,73 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         return bookies;
     }
     
+    /**
+     * For reads, there might be multiple operations.
+     * 
+     * @param entry
+     * @return ArrayList<BookieHandle>  returns list of bookies
+     */
+    ArrayList<BookieHandle> getBookies(long entry){
+        return getConfig(entry);
+    }
+    
+    /**
+     * Returns the bookie handle corresponding to the addresses in the input.
+     * 
+     * @param addr
+     * @return
+     */
+    BookieHandle getBookieHandleDup(InetSocketAddress addr){
+        for(BookieHandle bh : bookies){
+            if(bh.addr.equals(addr))
+                return bh;
+        }
+        
+        return null;
+    }
+    
+    /**
+     * Sets a new bookie configuration corresponding to a failure during
+     * writes to the ledger. We have one configuration for every failure.
+     * 
+     * @param entry
+     * @param list
+     */
+    
+    void setNewBookieConfig(long entry, ArrayList<BookieHandle> list){
+        if(bookieConfigMap == null)
+            bookieConfigMap = new TreeMap<Long, ArrayList<BookieHandle> >();
+        
+        /*
+         * If initial config is not in the list, we include it.
+         */
+        if(!bookieConfigMap.containsKey(new Long(0))){
+            bookieConfigMap.put(new Long(0), bookies);
+        }
+        
+        LOG.info("Adding new entry: " + entry + ", " + bookies.size() + ", " + list.size());
+        bookieConfigMap.put(entry, list);
+    }
+    
+    /**
+     * Once we read all changes to the bookie configuration, we
+     * have to call this method to generate an array that we use
+     * to determine the bookie configuration for an entry.
+     * 
+     * Note that this array is a performance optimization and 
+     * it is not necessary for correctness. We could just use 
+     * bookieConfigMap but it would be slower.
+     */
+    
+    void prepareEntryChange(){
+        entryChange = new long[bookieConfigMap.size()];
+    
+        int counter = 0;
+        for(Long l : bookieConfigMap.keySet()){
+            entryChange[counter++] = l;
+        }
+    }
+    
     /**
      * Return the quorum size. By default, the size of a quorum is (n+1)/2, 
      * where n is the size of the set of bookies.
@@ -345,6 +500,36 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         return qSize;   
     }
     
+    
+    /**
+     *  Returns the config corresponding to the entry
+     *  
+     * @param entry
+     * @return
+     */
+    private ArrayList<BookieHandle> getConfig(long entry){
+        if(bookieConfigMap == null)
+            return bookies;
+        
+        int index = Arrays.binarySearch(entryChange, entry);
+        
+        /*
+         * If not on the map, binarySearch returns a negative value
+         */
+        int before = index;
+        index = index >= 0? index : ((-1) - index);
+
+        if(index == 0){
+            if((entry % 10) == 0){
+                LOG.info("Index: " + index + ", " + before + ", " + entry + ", " + bookieConfigMap.get(entryChange[index]).size());
+            }
+            return bookieConfigMap.get(entryChange[index]); 
+        } else{
+            //LOG.warn("IndexDiff " + entry);
+            return bookieConfigMap.get(entryChange[index - 1]);
+        }
+    }
+    
     /**
      * Returns the quorum mode for this ledger: Verifiable or Generic
      */
@@ -440,12 +625,18 @@ public class LedgerHandle implements ReadCallback, AddCallback {
        return ledgerKey; 
     }
     
+    void closeUp(){
+        ledger = -1;
+        last = -1;
+        bk.haltBookieHandles(this, bookies);
+    }
+    
     /**
      * Close ledger.
      * 
      */
     public void close() 
-    throws KeeperException, InterruptedException {
+    throws KeeperException, InterruptedException, BKException {
         //Set data on zookeeper
         ByteBuffer last = ByteBuffer.allocate(8);
         last.putLong(lastAddConfirmed);
@@ -456,13 +647,12 @@ public class LedgerHandle implements ReadCallback, AddCallback {
                    last.array(), 
                    Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT); 
-        } else {
-            bk.getZooKeeper().setData(closePath, 
-                last.array(), -1);
-        }
+        } 
+        
         closeUp();
         StopOp sOp = new StopOp();
         qe.sendOp(sOp);
+        LOG.info("##### CB worker queue size: " + qe.cbWorker.pendingOps.size());
     }
     
     /**
@@ -515,7 +705,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         
         RetCounter counter = new RetCounter();
         counter.inc();
-        
+     
         Operation r = new ReadOp(this, firstEntry, lastEntry, this, counter);
         qe.sendOp(r);
         
@@ -523,7 +713,10 @@ public class LedgerHandle implements ReadCallback, AddCallback {
         counter.block(0);
         LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
         
-        if(counter.getSequence() == null) throw BKException.create(Code.ReadException);
+        if(counter.getSequence() == null){
+            LOG.error("Failed to read entries: " + firstEntry + ", " + lastEntry);
+            throw BKException.create(Code.ReadException);
+        }
         return counter.getSequence();
     }
    
@@ -535,7 +728,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
      * @param ctx   some control object
      */
     public void asyncAddEntry(byte[] data, AddCallback cb, Object ctx)
-    throws InterruptedException {
+    throws InterruptedException, BKException {
         AddOp r = new AddOp(this, data, cb, ctx);
         qe.sendOp(r);
     }
@@ -548,7 +741,7 @@ public class LedgerHandle implements ReadCallback, AddCallback {
      */
     
     public long addEntry(byte[] data)
-    throws InterruptedException{
+    throws InterruptedException, BKException{
         LOG.debug("Adding entry " + data);
         RetCounter counter = new RetCounter();
         counter.inc();

+ 142 - 14
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java

@@ -95,7 +95,7 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
         /**
          * Set value of action
          * 
-         * @return
+         * @return int  return action identifier
          */
         int setAction(int action){
             return this.action = action;
@@ -104,7 +104,7 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
         /**
          * Return value of action
          * 
-         * @return
+         * @return  int  return action identifier
          */
         int getAction(){
             return action;
@@ -122,7 +122,7 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
         /**
          * Return return code
          * 
-         * @return
+         * @return int return code
          */
         int getRC(){
             return rc;
@@ -365,7 +365,11 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
         private int qSize;
         private long last;
         private QMode qMode;
-        private List<String> bookieIds;
+        private List<String> children;
+        
+        private String dataString;
+        private String item;
+        private AtomicInteger counter;
 
         /**
          * Constructor of request to open a ledger.
@@ -468,8 +472,8 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
          * 
          * @param list  list of bbokie identifiers
          */
-        void addBookieIds(List<String> list){
-            this.bookieIds = list;
+        void addChildren(List<String> list){
+            this.children = list;
         }
         
         /**
@@ -477,8 +481,55 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
          * 
          * @return List<String> list of bookie identifiers
          */
-        List<String> getBookieIds(){
-            return bookieIds;
+        List<String> getChildren(){
+            return children;
+        }
+        
+        /**
+         * Returns the size of the children list. Used in processOpen.
+         * 
+         * @return int
+         */
+        int getListSize(){
+            return children.size();
+        }
+        
+        /**
+         * Sets the value of item. This is used in processOpen to
+         * keep the item value of the list of ensemble changes.
+         * 
+         * @param item
+         */
+        void setItem(String item){
+            this.item = item;
+        }
+        
+        /**
+         * Returns the value of item
+         * 
+         * @return String
+         */
+        
+        String getItem(){
+            return item;
+        }
+        
+        /**
+         * Sets the value of dataString
+         * 
+         * @param data  value to set
+         */
+        void setStringData(String data){
+            this.dataString = data;
+        }
+        
+        /**
+         * Returns the value of dataString
+         * 
+         * @return String
+         */
+        String getStringData(){
+            return dataString;
         }
     }
     
@@ -731,7 +782,7 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
                     String bookie = children.remove(index);
                     LOG.info("Bookie: " + bookie);
                     InetSocketAddress tAddr = bk.parseAddr(bookie);
-                    int bindex = cop.getLh().addBookie(tAddr); 
+                    int bindex = cop.getLh().addBookieForWriting(tAddr); 
                     ByteBuffer bindexBuf = ByteBuffer.allocate(4);
                     bindexBuf.putInt(bindex);
                 
@@ -773,6 +824,9 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
         if(oop.getRC() != BKDefs.EOK)
             oop.getCb().openComplete(oop.getRC(), null, oop.getCtx());
         
+        String path;
+        LedgerHandle lh;
+        
         switch(oop.getAction()){
         case 0:                    
             /*
@@ -833,7 +887,7 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
             /*
              *  Create ledger handle
              */
-            LedgerHandle lh = new LedgerHandle(bk, oop.getLid(), oop.getLast(), oop.getQSize(), oop.getQMode(), oop.getPasswd());
+            lh = new LedgerHandle(bk, oop.getLid(), oop.getLast(), oop.getQSize(), oop.getQMode(), oop.getPasswd());
                 
             /*
              * Get children of "/ledgers/id/ensemble" 
@@ -846,7 +900,7 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
             break;
 
         case 7:
-            List<String> list = oop.getBookieIds();
+            List<String> list = oop.getChildren();
             LOG.info("Length of list of bookies: " + list.size());
             try{
                 for(int i = 0 ; i < list.size() ; i++){
@@ -855,19 +909,81 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
                                 false, new Stat());
                         ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
                         if(bindexBuf.getInt() == i){                      
-                            oop.getLh().addBookie(bk.parseAddr(s));
+                            oop.getLh().addBookieForReading(bk.parseAddr(s));
                         }
                     }
                 }
+
+                /*
+                 * Check if there has been any change to the ensemble of bookies
+                 * due to failures.
+                 */
+                bk.getZooKeeper().exists(BKDefs.prefix + 
+                        bk.getZKStringId(oop.getLid()) +  
+                        BKDefs.quorumEvolution, 
+                                false,
+                                this,
+                                oop);
+                        
             } catch(KeeperException e){
                 LOG.error("Exception while adding bookies", e);
                 oop.setRC(BKDefs.EZK);
+                oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
             } catch(IOException e){
                 LOG.error("Exception while trying to connect to bookie");
                 oop.setRC(BKDefs.EIO);
-            } finally {
+                oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
+            } 
+            
+             break;
+        
+        case 8:
+            path = BKDefs.prefix + 
+            bk.getZKStringId(oop.getLid()) +  
+            BKDefs.quorumEvolution;
+                
+            bk.getZooKeeper().getChildren(path, 
+                    false,
+                    this,
+                    oop);
+        case 9: 
+            oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
+            break;
+        case 10:        
+            path = BKDefs.prefix + 
+            bk.getZKStringId(oop.getLid()) +  
+            BKDefs.quorumEvolution;
+            
+            for(String s : oop.getChildren()){
+                oop.setItem(s);
+                bk.getZooKeeper().getData(path + "/" + s, 
+                        false, 
+                        this,
+                        oop);
+            }
+            
+            break;
+        case 11:
+            lh = oop.getLh();
+            
+            String parts[] = oop.getStringData().split(" ");
+
+            ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
+            for(int i = 0 ; i < parts.length ; i++){
+                LOG.info("Address: " + parts[i]);
+                InetSocketAddress faultyBookie =  
+                    bk.parseAddr(parts[i].substring(1));                           
+        
+                newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
+            }
+            lh.setNewBookieConfig(Long.parseLong(oop.getItem()), newBookieSet);
+        
+            if(oop.counter.incrementAndGet() == oop.getListSize()){
+                lh.prepareEntryChange();
                 oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
             }
+            
+            break;
         }
     }    
     
@@ -956,6 +1072,12 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
                 else
                     op.setAction(4);
                 break;
+            case 8:
+                if(stat == null)
+                    op.setAction(9);
+                else
+                    op.setAction(10);
+                break;
             }
         case CLOSE:
             CloseLedgerOp clop = (CloseLedgerOp) op;
@@ -1064,7 +1186,7 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
                   break;
               case OPEN:
                   OpenLedgerOp oop = (OpenLedgerOp) op;
-                  oop.addBookieIds(children);
+                  oop.addChildren(children);
                   break;
        }
        
@@ -1119,6 +1241,12 @@ implements StatCallback, StringCallback, ChildrenCallback, DataCallback {
                            oop.setQMode(QMode.VERIFIABLE);
                        LOG.info("Verifiable ledger");
                        }
+                       break;
+                   case 10:
+                       String addr = new String(data);
+                       oop.setStringData(addr);
+                       oop.setAction(11);
+                       break;
                    }
                    break;
                default:

+ 5 - 4
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java

@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.HashMap;
 import java.util.TreeMap;
 
+//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerSequence;
@@ -48,7 +49,7 @@ import org.apache.zookeeper.KeeperException;
  * 
  */
 
-class LedgerRecoveryMonitor implements ReadEntryCallback{
+class LedgerRecoveryMonitor implements ReadEntryCallback {
     Logger LOG = Logger.getLogger(LedgerRecoveryMonitor.class);
     
     BookKeeper self;
@@ -132,11 +133,10 @@ class LedgerRecoveryMonitor implements ReadEntryCallback{
         
         /*
          * Obtain largest hint 
-         */
-        
+         */ 
         LedgerHandle lh = new LedgerHandle(self, lId, 0, qSize, qMode, passwd);
         for(InetSocketAddress addr : bookies){
-            lh.addBookie(addr);
+            lh.addBookieForReading(addr);
         }
         
         boolean notLegitimate = true;
@@ -241,4 +241,5 @@ class LedgerRecoveryMonitor implements ReadEntryCallback{
         
         return hint;
     }
+    
 }

+ 49 - 16
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java

@@ -22,9 +22,12 @@ package org.apache.bookkeeper.client;
 
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.ClientCBWorker;
 import org.apache.bookkeeper.client.QuorumOpMonitor;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -43,7 +46,7 @@ import org.apache.log4j.Logger;
  */
 
 public class QuorumEngine {
-    Logger LOG = Logger.getLogger(QuorumEngine.class);
+    static Logger LOG = Logger.getLogger(QuorumEngine.class);
 
     QuorumOpMonitor opMonitor;
     ClientCBWorker cbWorker;
@@ -56,6 +59,11 @@ public class QuorumEngine {
      * ADD, STOP.
      */
     
+    static long idCounter; 
+    static synchronized long getOpId(){
+        return idCounter++;
+    }
+    
     public static class Operation {
         public static final int READ = 0;
         public static final int ADD = 1;
@@ -64,9 +72,19 @@ public class QuorumEngine {
         
         int type;
         LedgerHandle ledger;
+        long id;
         int rc = 0;
         boolean ready = false;
         
+         public Operation(){
+             this.id = getOpId();
+         }
+            
+         long getId(){
+             return id;
+         }
+            
+
         public static class AddOp extends Operation {
             AddCallback cb;
             Object ctx;
@@ -178,11 +196,18 @@ public class QuorumEngine {
              this.rcb = rcb;
          }
      }
+     
+     public static class SubStopOp extends SubOp{
+         SubStopOp(Operation op){
+             this.op = op;
+         }
+     }
     }
     
     public QuorumEngine(LedgerHandle lh){ 
         this.lh = lh;
-        this.opMonitor = QuorumOpMonitor.getInstance(lh);
+        this.opMonitor = new QuorumOpMonitor(lh);
+        QuorumEngine.idCounter = 0;
         LOG.debug("Creating cbWorker");
         this.cbWorker = ClientCBWorker.getInstance();
         LOG.debug("Created cbWorker");
@@ -195,11 +220,12 @@ public class QuorumEngine {
      * @param r Operation descriptor
      */
     void sendOp(Operation r)
-    throws InterruptedException {
+    throws InterruptedException, BKException {
+        int n;    
         
-        int n = lh.getBookies().size();
         switch(r.type){
         case Operation.READ:
+            
             Operation.ReadOp rOp = (Operation.ReadOp) r;
             
             LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
@@ -211,6 +237,10 @@ public class QuorumEngine {
                 long counter = 0;
                 PendingReadOp pROp = new PendingReadOp(lh);
                 
+                n = lh.getBookies(entry).size();
+                if(n < lh.getQuorumSize())
+                    throw BKException.create(Code.NotEnoughBookiesException);
+                
                 //Send requests to bookies
                 while(counter < lh.getQuorumSize()){
                     int index = (int)((entry + counter++) % n);
@@ -219,7 +249,9 @@ public class QuorumEngine {
                                 pROp, 
                                 index,
                                 opMonitor);
-                        lh.getBookies().get((index) % n).sendRead(lh, sRead, entry);            
+   
+                        BookieHandle bh = lh.getBookies(entry).get((index) % n); 
+                        if(bh.isEnabled()) bh.sendRead(lh, sRead, entry);            
                     } catch(IOException e){
                         LOG.error(e);
                     }
@@ -228,11 +260,18 @@ public class QuorumEngine {
   
             break;
         case Operation.ADD:
+            n = lh.getBookies().size();
+
+            if(n < lh.getQuorumSize())
+                throw BKException.create(Code.NotEnoughBookiesException);
+            
             long counter = 0;
             
             cbWorker.addOperation(r);
             Operation.AddOp aOp = (Operation.AddOp) r;
             PendingOp pOp = new PendingOp();
+            ArrayList<BookieHandle> bookies;
+            
             while(counter < lh.getQuorumSize()  ){
                 int index = (int)((aOp.entry + counter++) % n);
                 
@@ -242,20 +281,14 @@ public class QuorumEngine {
                             pOp, 
                             index,
                             opMonitor);
+                   
                     lh.getBookies().get((index) % n).sendAdd(lh, sAdd, aOp.entry);
-                } catch (IOException io) {
-                    LOG.error(io);
-                    try{
-                        /*
-                         * Before getting a new bookie, try to reconnect
-                         */
-                        lh.getBookies().get((index) % n).restart();
-                    } catch (IOException nio){
-                        lh.removeBookie(index);
-                    }
+                } catch (Exception io) {
+                    LOG.error("Error when sending entry: " + aOp.entry + ", " + index + ", " + io);
+                    counter--;
+                    n = lh.getBookies().size();
                 }
             }
-            //qRef = (qRef + 1) % n;
             break;
                 case Operation.STOP:
                     cbWorker.shutdown();

+ 46 - 34
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java

@@ -55,21 +55,12 @@ import org.apache.log4j.Logger;
  * 
  */
 public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
-    Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
+    static Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
     
     LedgerHandle lh;
     
     static final int MAXRETRIES = 2;
-    static HashMap<Long, QuorumOpMonitor> instances = 
-        new HashMap<Long, QuorumOpMonitor>();
     
-    public static QuorumOpMonitor getInstance(LedgerHandle lh){
-        if(instances.get(lh.getId()) == null) {
-            instances.put(lh.getId(), new QuorumOpMonitor(lh));
-        }
-        
-        return instances.get(lh.getId());
-    }
     
     /**
      * Message disgest instance
@@ -160,17 +151,22 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
         if(rc == 0){
             // Everything went ok with this op
             synchronized(pOp){ 
-                //pOp.bookieIdSent.add(sId);
                 pOp.bookieIdRecv.add(sId);
-                if(pOp.bookieIdRecv.size() == lh.getQuorumSize()){
-                    //pendingAdds.remove(entryId);
-                    //sAdd.op.cb.addComplete(sAdd.op.getErrorCode(),
-                    //        ledgerId, entryId, sAdd.op.ctx);
+                lh.setLastRecvCorrectly(sId, entryId);
+                if(pOp.bookieIdRecv.size() >= lh.getQuorumSize()){
                     sAdd.op.setReady();     
                 }
             }
         } else {
-            LOG.error("Error sending write request: " + rc + " : " + ledgerId);
+            //LOG.warn("Error sending write request: " + rc + " : " + ledgerId + ": " + lh.getBookies().size());
+            /*
+             * If ledger is closed already, then simply return
+             */
+            if(lh.getId() == -1){
+                LOG.warn("Ledger identifier is not valid");
+                return;
+            }
+            
             HashSet<Integer> ids;
               
             synchronized(pOp){
@@ -180,8 +176,7 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
                 if(ids.size() == lh.getBookies().size()){
                     if(pOp.retries++ >= MAXRETRIES){
                         //Call back with error code
-                        //sAdd.op.cb.addComplete(ErrorCodes.ENUMRETRIES,
-                        //        ledgerId, entryId, sAdd.op.ctx);
+  
                         sAdd.op.setErrorCode(BKDefs.ENR);
                         sAdd.op.setReady();
                         return;
@@ -190,25 +185,38 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
                     ids.clear();
                 }
                 // Select another bookie that we haven't contacted yet
-                for(int i = 0; i < lh.getBookies().size(); i++){
-                    if(!ids.contains(Integer.valueOf(i))){
-                        // and send it to new bookie
-                        try{
-                            list.get(i).sendAdd(lh, new SubAddOp(sAdd.op, 
-                                    pOp, 
-                                    i, 
-                                    this), ((AddOp) sAdd.op).entry);
-                            pOp.bookieIdRecv.add(sId.intValue());
-                                
-                            break;
-                        } catch(IOException e){
-                            LOG.error(e);
-                        }
+                try{
+                    //LOG.info("Selecting another bookie " + entryId);
+                    int bCounter;
+                    if(sId >= (entryId % (lh.getBookies().size() + 1))){
+                        bCounter = sId - (((int) entryId) % (lh.getBookies().size() + 1));
+                    } else {
+                        bCounter = (lh.getBookies().size() + 1) - (((int) entryId) % (lh.getBookies().size() + 1)) - sId;
                     }
-                }       
+                    
+                    int tmpId = (((int) entryId) + lh.getQuorumSize()) % (lh.getBookies().size() + 1);
+                    int newId = tmpId % lh.getBookies().size();
+                    //LOG.info("Sending a new add operation to bookie: " + newId + ", " + lh.getBookies().get(newId).addr);
+                    
+                    BookieHandle bh = lh.getBookies().get(newId);
+                    
+                    //LOG.info("Got handle for " + newId);
+                    
+                    bh.sendAdd(lh, new SubAddOp(sAdd.op, 
+                            pOp, 
+                            newId, 
+                            this), entryId);
+               
+                    //LOG.info("Ended " + entryId + ", " + newId);
+                } catch(IOException e){
+                    LOG.error(e);
+                } catch(BKException e){
+                    LOG.error(e);
+                }
             }
-        }
+        }       
     }
+
     
     /**
      * Callback method for read operations. There is one callback for
@@ -256,6 +264,7 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
                                     byte[] data = new byte[voted.capacity() - dLength - 24];
                                     voted.position(24);                                    
                                     voted.get(data, 0, data.length);
+                                    //LOG.warn("Data length (" + entryId + "): " + data.length);
                                     counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
                                 } 
                             }
@@ -338,6 +347,7 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
         
         if(rOp.nacks.get(entryId).incrementAndGet() >= lh.getThreshold()){
             int counter = -1;
+            //LOG.warn("Giving up on " + entryId + "(" + lh.getThreshold() + ")");
             counter = addNewEntry(new LedgerEntry(ledgerId, entryId, null), rOp);
             
             if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) && 
@@ -450,6 +460,8 @@ public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
     private int addNewEntry(LedgerEntry le, ReadOp op){
         long index = le.getEntryId() % (op.lastEntry - op.firstEntry + 1);
         if(op.seq[(int) index] == null){
+            if(le.getEntry() == null) LOG.warn("Ledger entry is null (" + le.getEntryId() + ")");
+            //if(le.getEntryId() % 100 == 0) LOG.info("New entry: " + le.getEntryId() + ")");
             op.seq[(int) index] = le;
             
             return op.counter.incrementAndGet();

+ 120 - 56
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java

@@ -28,12 +28,16 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.Enumeration;
 import java.security.NoSuchAlgorithmException;
 import java.security.InvalidKeyException;
 import java.security.MessageDigest;
 import javax.crypto.Mac; 
 import javax.crypto.spec.SecretKeySpec;
 
+//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
+import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.proto.ReadEntryCallback;
 import org.apache.bookkeeper.proto.WriteCallback;
 import org.apache.log4j.Logger;
@@ -50,13 +54,8 @@ public class BookieClient extends Thread {
     int myCounter = 0;
 
     public BookieClient(InetSocketAddress addr, int recvTimeout)
-    throws IOException, ConnectException {
-        sock = SocketChannel.open(addr);
-        setDaemon(true);
-   
-        sock.socket().setSoTimeout(recvTimeout);
-        sock.socket().setTcpNoDelay(true);
-        start();
+    throws IOException, ConnectException { 
+        startConnection(addr, recvTimeout);
     }
     
     public BookieClient(String host, int port, int recvTimeout)
@@ -64,6 +63,16 @@ public class BookieClient extends Thread {
         this(new InetSocketAddress(host, port), recvTimeout);
     }
     
+    public void startConnection(InetSocketAddress addr, int recvTimeout)
+    throws IOException, ConnectException {
+        sock = SocketChannel.open(addr);
+        setDaemon(true);
+        //sock.configureBlocking(false);
+        sock.socket().setSoTimeout(recvTimeout);
+        sock.socket().setTcpNoDelay(true);
+        start();        
+    }
+    
     private static class Completion<T> {
         Completion(T cb, Object ctx) {
             this.cb = cb;
@@ -105,13 +114,12 @@ public class BookieClient extends Thread {
     ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions =
         new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
     
-    
     /*
      * Use this semaphore to control the number of completion key in both addCompletions
      * and readCompletions. This is more of a problem for readCompletions because one
      * readEntries opertion is expanded into individual operations to read entries.
      */
-    Semaphore completionSemaphore = new Semaphore(1000);
+    Semaphore completionSemaphore = new Semaphore(3000);
     
    
     /**
@@ -150,7 +158,10 @@ public class BookieClient extends Thread {
     }
     
     /**
-     * Send addEntry operation to bookie.
+     * Send addEntry operation to bookie. It throws an IOException
+     * if either the write to the socket fails or it takes too long
+     * to obtain a permit to send another request, which possibly 
+     * implies that the corresponding bookie is down.
      * 
      * @param ledgerId	ledger identifier
      * @param entryId 	entry identifier
@@ -163,39 +174,37 @@ public class BookieClient extends Thread {
             ByteBuffer entry, WriteCallback cb, Object ctx) 
     throws IOException, InterruptedException {
         
-        //LOG.info("Data length: " + entry.capacity());
-    	completionSemaphore.acquire();
+        if(cb == null)
+            LOG.error("WriteCallback object is null: " + entryId);
         addCompletions.put(new CompletionKey(ledgerId, entryId),
                 new Completion<WriteCallback>(cb, ctx));
-        //entry = entry.duplicate();
-        //entry.position(0);
-        
+
         ByteBuffer tmpEntry = ByteBuffer.allocate(entry.remaining() + 44);
 
         tmpEntry.position(4);
         tmpEntry.putInt(BookieProtocol.ADDENTRY);
         tmpEntry.put(masterKey);
-        //LOG.debug("Master key: " + new String(masterKey));
         tmpEntry.putLong(ledgerId);
         tmpEntry.putLong(entryId);
         tmpEntry.put(entry);
         tmpEntry.position(0);
         
-        //ByteBuffer len = ByteBuffer.allocate(4);
         // 4 bytes for the message type
         tmpEntry.putInt(tmpEntry.remaining() - 4);
         tmpEntry.position(0);
-        //sock.write(len);
-        //len.clear();
-        //len.putInt(BookieProtocol.ADDENTRY);
-        //len.flip();
-        //sock.write(len);
-        sock.write(tmpEntry);
-        //LOG.debug("addEntry:finished");
+
+        
+        if(!sock.isConnected() || 
+                !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){ 
+            throw new IOException();
+        } else sock.write(tmpEntry);
     }
     
     /**
-     * Send readEntry operation to bookie.
+     * Send readEntry operation to bookie. It throws an IOException
+     * if either the write to the socket fails or it takes too long
+     * to obtain a permit to send another request, which possibly 
+     * implies that the corresponding bookie is down.
      * 
      * @param ledgerId	ledger identifier
      * @param entryId	entry identifier
@@ -206,28 +215,22 @@ public class BookieClient extends Thread {
     synchronized public void readEntry(long ledgerId, long entryId,
             ReadEntryCallback cb, Object ctx) 
     throws IOException, InterruptedException {
-    	
-    	completionSemaphore.acquire();
+        //LOG.info("Entry id: " + entryId);
+    	//completionSemaphore.acquire();
         readCompletions.put(new CompletionKey(ledgerId, entryId),
                 new Completion<ReadEntryCallback>(cb, ctx));
+        
         ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8 + 8);
         tmpEntry.putInt(20);
         tmpEntry.putInt(BookieProtocol.READENTRY);
         tmpEntry.putLong(ledgerId);
         tmpEntry.putLong(entryId);
         tmpEntry.position(0);
-        
-        //ByteBuffer len = ByteBuffer.allocate(4);
-        //len.putInt(tmpEntry.remaining() + 4);
-        //len.flip();
-        //LOG.debug("readEntry: Writing to socket");
-        //sock.write(len);
-        //len.clear();
-        //len.putInt(BookieProtocol.READENTRY);
-        //len.flip();
-        //sock.write(len);
-        sock.write(tmpEntry);
-        //LOG.error("Size of readCompletions: " + readCompletions.size());
+
+        if(!sock.isConnected() || 
+                !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){ 
+            throw new IOException();
+        } else sock.write(tmpEntry);
     }
     
     private void readFully(ByteBuffer bb) throws IOException {
@@ -236,6 +239,7 @@ public class BookieClient extends Thread {
         }
     }
     
+    Semaphore running = new Semaphore(0);
     public void run() {
         int len = -1;
         ByteBuffer lenBuffer = ByteBuffer.allocate(4);
@@ -254,47 +258,44 @@ public class BookieClient extends Thread {
  
                 switch(type) {
                 case BookieProtocol.ADDENTRY:
-                {
+                {                    
                     long ledgerId = bb.getLong();
                     long entryId = bb.getLong();
-                    Completion<WriteCallback> ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+
+                    Completion<WriteCallback> ac;
+                    ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
                     completionSemaphore.release();
-                    
                     if (ac != null) {
                         ac.cb.writeComplete(rc, ledgerId, entryId, ac.ctx);
                     } else {
                         LOG.error("Callback object null: " + ledgerId + " : " + entryId);
                     }
+
                     break;
                 }
                 case BookieProtocol.READENTRY:
                 {
-                    //ByteBuffer entryData = bb.slice();
                     long ledgerId = bb.getLong();
                     long entryId = bb.getLong();
                     
                     bb.position(24);
                     byte[] data = new byte[bb.capacity() - 24];
                     bb.get(data);
-                    ByteBuffer entryData = ByteBuffer.wrap(data);
-                    //ByteBuffer entryData = bb;
-                    //LOG.info("Received entry: " + ledgerId + ", " + entryId
-                    // + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());          
+                    ByteBuffer entryData = ByteBuffer.wrap(data);         
                     
                     CompletionKey key = new CompletionKey(ledgerId, entryId);
                     Completion<ReadEntryCallback> c;
                     
                     if(readCompletions.containsKey(key)){
-                        c = readCompletions.remove(key);
-                        //LOG.error("Found key");
+                            c = readCompletions.remove(key);
                     }
                     else{    
-                        /*
-                         * This is a special case. When recovering a ledger, a client submits
-                         * a read request with id -1, and receives a response with a different
-                         * entry id.
-                         */
-                        c = readCompletions.remove(new CompletionKey(ledgerId, -1));
+                            /*
+                             * This is a special case. When recovering a ledger, a client submits
+                             * a read request with id -1, and receives a response with a different
+                             * entry id.
+                             */
+                            c = readCompletions.remove(new CompletionKey(ledgerId, -1));
                     }
                     completionSemaphore.release();
                     
@@ -311,9 +312,72 @@ public class BookieClient extends Thread {
                     System.err.println("Got error " + rc + " for type " + type);
                 }
             }
+            
         } catch(Exception e) {
-            LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc, e);
+            LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc);
         }
+        running.release();
+        
+    }
+    
+    /**
+     * Errors out pending entries. We call this method from one thread to avoid
+     * concurrent executions to QuorumOpMonitor (implements callbacks). It seems
+     * simpler to call it from BookieHandle instead of calling directly from here.
+     */
+    
+    public void errorOut(){
+        LOG.info("Erroring out pending entries");
+    
+        for (Enumeration<CompletionKey> e = addCompletions.keys() ; e.hasMoreElements() ;) {
+            CompletionKey key = e.nextElement();
+            Completion<WriteCallback> ac = addCompletions.remove(key);
+            if(ac != null){
+                completionSemaphore.release();
+                ac.cb.writeComplete(-1, key.ledgerId, key.entryId, ac.ctx);
+            }
+        }
+        
+        LOG.info("Finished erroring out pending add entries");
+         
+        for (Enumeration<CompletionKey> e = readCompletions.keys() ; e.hasMoreElements() ;) {
+            CompletionKey key = e.nextElement();
+            Completion<ReadEntryCallback> ac = readCompletions.remove(key);
+                
+            if(ac != null){
+                completionSemaphore.release();
+                ac.cb.readEntryComplete(-1, key.ledgerId, key.entryId, null, ac.ctx);
+            }
+        }
+        
+        LOG.info("Finished erroring out pending read entries");
+    }
+
+    /**
+     * Halts client.
+     */
+    
+    public void halt() {
+        try{
+            sock.close();
+        } catch(IOException e) {
+            LOG.warn("Exception while closing socket");
+        }
+        
+        try{
+            running.acquire();
+        } catch(InterruptedException e){
+            LOG.error("Interrupted while waiting for running semaphore to acquire lock");
+        }
+    }
+    
+    /**
+     * Returns the status of the socket of this bookie client.
+     * 
+     * @return boolean
+     */
+    public boolean isConnected(){
+        return sock.isConnected();
     }
 
     private static class Counter {

+ 5 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java

@@ -39,6 +39,7 @@ import org.apache.log4j.Logger;
 public class BookieServer implements NIOServerFactory.PacketProcessor, WriteCallback {
     int port;
     NIOServerFactory nioServerFactory;
+    volatile boolean down = false;
     Bookie bookie;
     static Logger LOG = Logger.getLogger(BookieServer.class);
     
@@ -50,9 +51,13 @@ public class BookieServer implements NIOServerFactory.PacketProcessor, WriteCall
         nioServerFactory = new NIOServerFactory(port, this);
     }
     public void shutdown() throws InterruptedException {
+        down = true;
         nioServerFactory.shutdown();
         bookie.shutdown();
     }
+    public boolean isDown(){
+        return down;
+    }
     public void join() throws InterruptedException {
         nioServerFactory.join();
     }

+ 5 - 0
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.log4j.Logger;
 
@@ -87,6 +88,8 @@ public class LedgerOutputStream extends OutputStream {
             } catch(InterruptedException ie) {
                 LOG.warn("Interrupted while flusing " + ie);
                 Thread.currentThread().interrupt();
+            } catch(BKException bke) {
+                LOG.warn("BookKeeper exception ", bke);
             }
         }
     }
@@ -120,6 +123,8 @@ public class LedgerOutputStream extends OutputStream {
             } catch(InterruptedException ie) {
                 LOG.warn("Interrupted while writing", ie);
                 Thread.currentThread().interrupt();
+            } catch(BKException bke) {
+                LOG.warn("BookKeeper exception", bke);
             }
         }
     }

+ 10 - 8
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java

@@ -106,18 +106,18 @@ public class BookieClientTest extends TestCase {
         
         BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
         ByteBuffer bb;
-        bb = createByteBuffer(1);
+        bb = createByteBuffer(1,1,1);
         bc.addEntry(1, passwd, 1, bb, wrcb, null);
-        bb = createByteBuffer(2);
+        bb = createByteBuffer(2,1,2);
         bc.addEntry(1, passwd, 2, bb, wrcb, null);
-        bb = createByteBuffer(3);
+        bb = createByteBuffer(3,1,3);
         bc.addEntry(1, passwd, 3, bb, wrcb, null);
-        bb = createByteBuffer(5);
+        bb = createByteBuffer(5,1,5);
         bc.addEntry(1, passwd, 5, bb, wrcb, null);
-        bb = createByteBuffer(7);
+        bb = createByteBuffer(7,1,7);
         bc.addEntry(1, passwd, 7, bb, wrcb, null);
         synchronized(notifyObject) {
-            bb = createByteBuffer(11);
+            bb = createByteBuffer(11,1,11);
             bc.addEntry(1, passwd, 11, bb, wrcb, notifyObject);
             notifyObject.wait();
         }
@@ -184,10 +184,12 @@ public class BookieClientTest extends TestCase {
             assertEquals(BookieProtocol.ENOENTRY, arc.rc);
         }
     }
-    private ByteBuffer createByteBuffer(int i) {
+    private ByteBuffer createByteBuffer(int i, long lid, long eid) {
         ByteBuffer bb;
-        bb = ByteBuffer.allocate(4);
+        bb = ByteBuffer.allocate(4+16);
         bb.putInt(i);
+        bb.putLong(lid);
+        bb.putLong(eid);
         bb.flip();
         return bb;
     }

+ 385 - 0
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java

@@ -0,0 +1,385 @@
+package org.apache.bookkeeper.test;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.streaming.LedgerInputStream;
+import org.apache.bookkeeper.streaming.LedgerOutputStream;
+import org.apache.bookkeeper.util.ClientBase;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Test;
+
+/**
+ * This test tests read and write, synchronous and 
+ * asynchronous, strings and integers for a BookKeeper client. 
+ * The test deployment uses a ZooKeeper server 
+ * and three BookKeepers. 
+ * 
+ */
+
+public class BookieFailureTest 
+    extends junit.framework.TestCase 
+    implements AddCallback, ReadCallback{
+    
+
+    //Depending on the taste, select the amount of logging
+    // by decommenting one of the two lines below
+    //static Logger LOG = Logger.getRootLogger();
+    static Logger LOG = Logger.getLogger(BookieReadWriteTest.class);
+
+    static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+
+    // ZooKeeper related variables
+    private static final String HOSTPORT = "127.0.0.1:2181";
+    static Integer ZooKeeperDefaultPort = 2181;
+    ZooKeeperServer zks;
+    ZooKeeper zkc; //zookeeper client
+    NIOServerCnxn.Factory serverFactory;
+    File ZkTmpDir;
+    
+    //BookKeeper 
+    File tmpDirB1, tmpDirB2, tmpDirB3, tmpDirB4;
+    BookieServer bs1, bs2, bs3, bs4;
+    Integer initialPort = 5000;
+    BookKeeper bkc; // bookkeeper client
+    byte[] ledgerPassword = "aaa".getBytes();
+    LedgerHandle lh, lh2;
+    long ledgerId;
+    LedgerSequence ls;
+    
+    //test related variables 
+    int numEntriesToWrite = 20000;
+    int maxInt = 2147483647;
+    Random rng; // Random Number Generator 
+    ArrayList<byte[]> entries; // generated entries
+    ArrayList<Integer> entriesSize;
+    
+    // Synchronization
+    SyncObj sync;
+    Set<Object> syncObjs;
+    
+    class SyncObj {
+        int counter;
+        boolean value;      
+        public SyncObj() {
+            counter = 0;
+            value = false;
+        }       
+    }
+    
+   /**
+    * Tests writes and reads when a bookie fails.
+    *  
+    * @throws {@link IOException}
+    */
+    @Test
+    public void testAsyncBK1() throws IOException{ 
+        LOG.info("#### BK1 ####");
+        auxTestReadWriteAsyncSingleClient(bs1);
+    }
+   
+   @Test
+   public void testAsyncBK2() throws IOException{    
+       LOG.info("#### BK2 ####");
+       auxTestReadWriteAsyncSingleClient(bs2);
+   }
+   
+   @Test
+   public void testAsyncBK3() throws IOException{    
+       LOG.info("#### BK3 ####"); 
+       auxTestReadWriteAsyncSingleClient(bs3);
+   }
+   
+   @Test
+   public void testAsyncBK4() throws IOException{
+       LOG.info("#### BK4 ####");
+        auxTestReadWriteAsyncSingleClient(bs4);
+   }
+    
+    void auxTestReadWriteAsyncSingleClient(BookieServer bs) throws IOException{
+        try {
+            // Create a BookKeeper client and a ledger
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(4, 2, QMode.VERIFIABLE, ledgerPassword);
+        
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            for(int i = 0; i < numEntriesToWrite; i++){
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+                
+                entries.add(entry.array());
+                entriesSize.add(entry.array().length);
+                lh.asyncAddEntry(entry.array(), this, sync);
+                if(i == 5000){
+                    //Bookie fail
+                    bs.shutdown();
+                }
+            }
+            
+            // wait for all entries to be acknowledged
+            synchronized (sync) {
+                while (sync.counter < numEntriesToWrite){
+                    LOG.debug("Entries counter = " + sync.counter);
+                    sync.wait();
+                }
+            }
+            
+            LOG.debug("*** WRITE COMPLETE ***");
+            // close ledger 
+            lh.close();
+            
+            //*** WRITING PART COMPLETE // READ PART BEGINS ***
+            
+            // open ledger
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.openLedger(ledgerId, ledgerPassword);
+            LOG.debug("Number of entries written: " + lh.getLast());
+            assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));     
+            
+            //read entries
+            
+            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+            
+            synchronized (sync) {
+                while(sync.value == false){
+                    sync.wait(10000);
+                    assertTrue("Haven't received entries", sync.value);
+                }               
+            }
+            
+            assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+            
+            LOG.debug("*** READ COMPLETE ***");
+            
+            // at this point, LedgerSequence ls is filled with the returned values
+            int i = 0;
+            LOG.info("Size of ledger sequence: " + ls.size());
+            while(ls.hasMoreElements()){
+                ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+                Integer origEntry = origbb.getInt();
+                byte[] entry = ls.nextElement().getEntry();
+                ByteBuffer result = ByteBuffer.wrap(entry);
+
+                Integer retrEntry = result.getInt();
+                LOG.debug("Retrieved entry: " + i);
+                assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+                assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+                i++;
+            }
+            
+            LOG.info("Verified that entries are ok, and now closing ledger");
+            lh.close();
+        } catch (KeeperException e) {
+            fail(e.toString());
+        } catch (BKException e) {
+            fail(e.toString());
+        } catch (InterruptedException e) {
+            fail(e.toString());
+        } 
+        
+    }
+    
+    public void addComplete(int rc, 
+            LedgerHandle lh, 
+            long entryId, 
+            Object ctx) {
+        if(rc != 0)
+            fail("Failed to write entry: " + entryId);
+        SyncObj x = (SyncObj) ctx;
+        synchronized (x) {
+            x.counter++;
+            x.notify();
+        }
+    }
+
+    public void readComplete(int rc, 
+            LedgerHandle lh, 
+            LedgerSequence seq,
+            Object ctx) {
+        if(rc != 0)
+            fail("Failed to write entry");
+        ls = seq;               
+        synchronized (sync) {
+            sync.value = true;
+            sync.notify();
+        }
+        
+    }
+    
+    protected void setUp() throws IOException, InterruptedException {
+        LOG.addAppender(ca);
+        LOG.setLevel((Level) Level.DEBUG);
+        
+        // create a ZooKeeper server(dataDir, dataLogDir, port)
+        LOG.debug("Running ZK server (setup)");
+        //ServerStats.registerAsConcrete();
+        ClientBase.setupTestEnv();
+        ZkTmpDir = File.createTempFile("zookeeper", "test");
+        ZkTmpDir.delete();
+        ZkTmpDir.mkdir();
+            
+        try {
+            zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+            serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+            serverFactory.startup(zks);
+        } catch (IOException e1) {
+            // TODO Auto-generated catch block
+            e1.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
+        
+        LOG.debug("Server up: " + b);
+        
+        // create a zookeeper client
+        LOG.debug("Instantiate ZK Client");
+        zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+        
+        //initialize the zk client with values
+        try {
+            zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 3), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        
+        // Create Bookie Servers (B1, B2, B3)
+        tmpDirB1 = File.createTempFile("bookie1", "test");
+        tmpDirB1.delete();
+        tmpDirB1.mkdir();
+         
+        bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
+        bs1.start();
+        
+        tmpDirB2 = File.createTempFile("bookie2", "test");
+        tmpDirB2.delete();
+        tmpDirB2.mkdir();
+            
+        bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
+        bs2.start();
+
+        tmpDirB3 = File.createTempFile("bookie3", "test");
+        tmpDirB3.delete();
+        tmpDirB3.mkdir();
+        
+        bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
+        bs3.start();
+        
+        tmpDirB4 = File.createTempFile("bookie4", "test");
+        tmpDirB4.delete();
+        tmpDirB4.mkdir();
+        
+        bs4 = new BookieServer(initialPort + 3, tmpDirB4, new File[]{tmpDirB4});
+        bs4.start();
+        
+        rng = new Random(System.currentTimeMillis());   // Initialize the Random Number Generator 
+        entries = new ArrayList<byte[]>(); // initialize the  entries list
+        entriesSize = new ArrayList<Integer>(); 
+        sync = new SyncObj(); // initialize the synchronization data structure
+        
+        zkc.close();
+    }
+    
+    protected void tearDown() throws InterruptedException {
+        LOG.info("TearDown");
+        bkc.halt();
+        
+        //shutdown bookie servers 
+        if(!bs1.isDown()) bs1.shutdown();
+        if(!bs2.isDown()) bs2.shutdown();
+        if(!bs3.isDown()) bs3.shutdown();
+        if(!bs4.isDown()) bs4.shutdown();
+             
+        cleanUpDir(tmpDirB1);
+        cleanUpDir(tmpDirB2);
+        cleanUpDir(tmpDirB3);
+        cleanUpDir(tmpDirB4);
+        //shutdown ZK server
+        serverFactory.shutdown();
+        assertTrue("waiting for server down",
+                ClientBase.waitForServerDown(HOSTPORT,
+                                             ClientBase.CONNECTION_TIMEOUT));
+        //ServerStats.unregister();
+        cleanUpDir(ZkTmpDir);
+        
+    }
+
+    /*  Clean up a directory recursively */
+    protected boolean cleanUpDir(File dir){
+        if (dir.isDirectory()) {
+            LOG.info("Cleaning up " + dir.getName());
+            String[] children = dir.list();
+            for (String string : children) {
+                boolean success = cleanUpDir(new File(dir, string));
+                if (!success) return false;
+            }
+        }
+        // The directory is now empty so delete it
+        return dir.delete();        
+    }
+
+    /*  User for testing purposes, void */
+    class emptyWatcher implements Watcher{
+        public void process(WatchedEvent event) {}
+    }
+    
+
+}

+ 408 - 404
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java

@@ -63,52 +63,52 @@ import org.junit.Test;
  */
 
 public class BookieReadWriteTest 
-	extends junit.framework.TestCase 
-	implements AddCallback, ReadCallback{
+    extends junit.framework.TestCase 
+    implements AddCallback, ReadCallback{
 
-	//Depending on the taste, select the amount of logging
-	// by decommenting one of the two lines below
-	//static Logger LOG = Logger.getRootLogger();
-	static Logger LOG = Logger.getLogger(BookieClientTest.class);
+    //Depending on the taste, select the amount of logging
+    // by decommenting one of the two lines below
+    //static Logger LOG = Logger.getRootLogger();
+    static Logger LOG = Logger.getLogger(BookieReadWriteTest.class);
 
-	static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+    static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
 
-	// ZooKeeper related variables
+    // ZooKeeper related variables
     private static final String HOSTPORT = "127.0.0.1:2181";
-	static Integer ZooKeeperDefaultPort = 2181;
-	ZooKeeperServer zks;
-	ZooKeeper zkc; //zookeeper client
-	NIOServerCnxn.Factory serverFactory;
-	File ZkTmpDir;
-	
-	//BookKeeper 
-	File tmpDirB1, tmpDirB2, tmpDirB3;
-	BookieServer bs1, bs2, bs3;
-	Integer initialPort = 5000;
-	BookKeeper bkc; // bookkeeper client
-	byte[] ledgerPassword = "aaa".getBytes();
-	LedgerHandle lh, lh2;
-	long ledgerId;
-	LedgerSequence ls;
-	
-	//test related variables 
-	int numEntriesToWrite = 20;
-	int maxInt = 2147483647;
-	Random rng; // Random Number Generator 
-	ArrayList<byte[]> entries; // generated entries
-	ArrayList<Integer> entriesSize;
-	
-	// Synchronization
-	SyncObj sync;
-	Set<Object> syncObjs;
-	
+    static Integer ZooKeeperDefaultPort = 2181;
+    ZooKeeperServer zks;
+    ZooKeeper zkc; //zookeeper client
+    NIOServerCnxn.Factory serverFactory;
+    File ZkTmpDir;
+    
+    //BookKeeper 
+    File tmpDirB1, tmpDirB2, tmpDirB3;
+    BookieServer bs1, bs2, bs3;
+    Integer initialPort = 5000;
+    BookKeeper bkc; // bookkeeper client
+    byte[] ledgerPassword = "aaa".getBytes();
+    LedgerHandle lh, lh2;
+    long ledgerId;
+    LedgerSequence ls;
+    
+    //test related variables 
+    int numEntriesToWrite = 200;
+    int maxInt = 2147483647;
+    Random rng; // Random Number Generator 
+    ArrayList<byte[]> entries; // generated entries
+    ArrayList<Integer> entriesSize;
+    
+    // Synchronization
+    SyncObj sync;
+    Set<Object> syncObjs;
+    
     class SyncObj {
-    	int counter;
-    	boolean value;    	
-    	public SyncObj() {
-			counter = 0;
-			value = false;
-		}    	
+        int counter;
+        boolean value;      
+        public SyncObj() {
+            counter = 0;
+            value = false;
+        }       
     }
     
     @Test
@@ -137,7 +137,7 @@ public class BookieReadWriteTest
         // create a buffer of a single bytes
         // and check for corner cases
         String toWrite = "we need to check for this string to match " +
-        		"and for the record mahadev is the best";
+                "and for the record mahadev is the best";
         LedgerOutputStream lout = new LedgerOutputStream(lh , 1);
         byte[] b = toWrite.getBytes();
         lout.write(b);
@@ -181,252 +181,255 @@ public class BookieReadWriteTest
         
     
     @Test
-	public void testReadWriteAsyncSingleClient() throws IOException{
-		try {
-			// Create a BookKeeper client and a ledger
-			bkc = new BookKeeper("127.0.0.1");
-			lh = bkc.createLedger(ledgerPassword);
-			//bkc.initMessageDigest("SHA1");
-			ledgerId = lh.getId();
-			LOG.info("Ledger ID: " + lh.getId());
-			for(int i = 0; i < numEntriesToWrite; i++){
-				ByteBuffer entry = ByteBuffer.allocate(4);
-				entry.putInt(rng.nextInt(maxInt));
-				entry.position(0);
-				
-				entries.add(entry.array());
-				entriesSize.add(entry.array().length);
-				lh.asyncAddEntry(entry.array(), this, sync);
-			}
-			
-			// wait for all entries to be acknowledged
-			synchronized (sync) {
-				while (sync.counter < numEntriesToWrite){
-					LOG.debug("Entries counter = " + sync.counter);
-					sync.wait();
-				}
-			}
-			
-			LOG.debug("*** WRITE COMPLETE ***");
-			// close ledger 
-			lh.close();
-			
-			//*** WRITING PART COMPLETE // READ PART BEGINS ***
-			
-			// open ledger
-			lh = bkc.openLedger(ledgerId, ledgerPassword);
-			LOG.debug("Number of entries written: " + lh.getLast());
-			assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));		
-			
-			//read entries
-			lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-			
-			synchronized (sync) {
-				while(sync.value == false){
-					sync.wait();
-				}				
-			}
-			
-			assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-			
-			LOG.debug("*** READ COMPLETE ***");
-			
-			// at this point, LedgerSequence ls is filled with the returned values
-			int i = 0;
-			while(ls.hasMoreElements()){
-			    ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
-				Integer origEntry = origbb.getInt();
-				byte[] entry = ls.nextElement().getEntry();
-				ByteBuffer result = ByteBuffer.wrap(entry);
-				LOG.debug("Length of result: " + result.capacity());
-				LOG.debug("Original entry: " + origEntry);
+    public void testReadWriteAsyncSingleClient() throws IOException{
+        try {
+            // Create a BookKeeper client and a ledger
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(ledgerPassword);
+            //bkc.initMessageDigest("SHA1");
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            for(int i = 0; i < numEntriesToWrite; i++){
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+                
+                entries.add(entry.array());
+                entriesSize.add(entry.array().length);
+                lh.asyncAddEntry(entry.array(), this, sync);
+            }
+            
+            // wait for all entries to be acknowledged
+            synchronized (sync) {
+                while (sync.counter < numEntriesToWrite){
+                    LOG.debug("Entries counter = " + sync.counter);
+                    sync.wait();
+                }
+            }
+            
+            LOG.debug("*** WRITE COMPLETE ***");
+            // close ledger 
+            lh.close();
+            
+            //*** WRITING PART COMPLETE // READ PART BEGINS ***
+            
+            // open ledger
+            lh = bkc.openLedger(ledgerId, ledgerPassword);
+            LOG.debug("Number of entries written: " + lh.getLast());
+            assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));     
+            
+            //read entries
+            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+            
+            synchronized (sync) {
+                while(sync.value == false){
+                    sync.wait();
+                }               
+            }
+            
+            assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+            
+            LOG.debug("*** READ COMPLETE ***");
+            
+            // at this point, LedgerSequence ls is filled with the returned values
+            int i = 0;
+            while(ls.hasMoreElements()){
+                ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+                Integer origEntry = origbb.getInt();
+                byte[] entry = ls.nextElement().getEntry();
+                ByteBuffer result = ByteBuffer.wrap(entry);
+                LOG.debug("Length of result: " + result.capacity());
+                LOG.debug("Original entry: " + origEntry);
 
-				Integer retrEntry = result.getInt();
-				LOG.debug("Retrieved entry: " + retrEntry);
-				assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
-				assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
-				i++;
-			}
-			lh.close();
-		} catch (KeeperException e) {
-			e.printStackTrace();
-		} catch (BKException e) {
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		} //catch (NoSuchAlgorithmException e) {
-		//	e.printStackTrace();
-		//}
-		
-	}
-	
+                Integer retrEntry = result.getInt();
+                LOG.debug("Retrieved entry: " + retrEntry);
+                assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+                assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+                i++;
+            }
+            lh.close();
+        } catch (KeeperException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to ZooKeeper exception");
+        } catch (BKException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
+        } catch (InterruptedException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        } 
+    }
+    
     @Test
-	public void testSyncReadAsyncWriteStringsSingleClient() throws IOException{
-    	LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
-		String charset = "utf-8";
-		LOG.debug("Default charset: "  + Charset.defaultCharset());
-		try {
-			// Create a BookKeeper client and a ledger
-			bkc = new BookKeeper("127.0.0.1");
-			lh = bkc.createLedger(ledgerPassword);
-			//bkc.initMessageDigest("SHA1");
-			ledgerId = lh.getId();
-			LOG.info("Ledger ID: " + lh.getId());
-			for(int i = 0; i < numEntriesToWrite; i++){
-				int randomInt = rng.nextInt(maxInt);
-				byte[] entry = new String(Integer.toString(randomInt)).getBytes(charset);
-				entries.add(entry);
-				lh.asyncAddEntry(entry, this, sync);
-			}
-			
-			// wait for all entries to be acknowledged
-			synchronized (sync) {
-				while (sync.counter < numEntriesToWrite){
-					LOG.debug("Entries counter = " + sync.counter);
-					sync.wait();
-				}
-			}
-			
-			LOG.debug("*** ASYNC WRITE COMPLETE ***");
-			// close ledger 
-			lh.close();
-			
-			//*** WRITING PART COMPLETED // READ PART BEGINS ***
-			
-			// open ledger
-			lh = bkc.openLedger(ledgerId, ledgerPassword);
-			LOG.debug("Number of entries written: " + lh.getLast());
-			assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));		
-			
-			//read entries			
-			ls = lh.readEntries(0, numEntriesToWrite - 1);
-			
-			assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-			
-			LOG.debug("*** SYNC READ COMPLETE ***");
-			
-			// at this point, LedgerSequence ls is filled with the returned values
-			int i = 0;
-			while(ls.hasMoreElements()){
-				byte[] origEntryBytes = entries.get(i++);
-				byte[] retrEntryBytes = ls.nextElement().getEntry();
-				
-				LOG.debug("Original byte entry size: " + origEntryBytes.length);
-				LOG.debug("Saved byte entry size: " + retrEntryBytes.length);
-				
-				String origEntry = new String(origEntryBytes, charset);
-				String retrEntry = new String(retrEntryBytes, charset);
-				
-				LOG.debug("Original entry: " + origEntry);
-				LOG.debug("Retrieved entry: " + retrEntry);
-				
-				assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
-			}
-			lh.close();
-		} catch (KeeperException e) {
-			e.printStackTrace();
-		} catch (BKException e) {
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		} //catch (NoSuchAlgorithmException e) {
-		//	e.printStackTrace();
-		//}
-		
-	}
+    public void testSyncReadAsyncWriteStringsSingleClient() throws IOException{
+        LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
+        String charset = "utf-8";
+        LOG.debug("Default charset: "  + Charset.defaultCharset());
+        try {
+            // Create a BookKeeper client and a ledger
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(ledgerPassword);
+            //bkc.initMessageDigest("SHA1");
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            for(int i = 0; i < numEntriesToWrite; i++){
+                int randomInt = rng.nextInt(maxInt);
+                byte[] entry = new String(Integer.toString(randomInt)).getBytes(charset);
+                entries.add(entry);
+                lh.asyncAddEntry(entry, this, sync);
+            }
+            
+            // wait for all entries to be acknowledged
+            synchronized (sync) {
+                while (sync.counter < numEntriesToWrite){
+                    LOG.debug("Entries counter = " + sync.counter);
+                    sync.wait();
+                }
+            }
+            
+            LOG.debug("*** ASYNC WRITE COMPLETE ***");
+            // close ledger 
+            lh.close();
+            
+            //*** WRITING PART COMPLETED // READ PART BEGINS ***
+            
+            // open ledger
+            lh = bkc.openLedger(ledgerId, ledgerPassword);
+            LOG.debug("Number of entries written: " + lh.getLast());
+            assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));     
+            
+            //read entries          
+            ls = lh.readEntries(0, numEntriesToWrite - 1);
+            
+            assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+            
+            LOG.debug("*** SYNC READ COMPLETE ***");
+            
+            // at this point, LedgerSequence ls is filled with the returned values
+            int i = 0;
+            while(ls.hasMoreElements()){
+                byte[] origEntryBytes = entries.get(i++);
+                byte[] retrEntryBytes = ls.nextElement().getEntry();
+                
+                LOG.debug("Original byte entry size: " + origEntryBytes.length);
+                LOG.debug("Saved byte entry size: " + retrEntryBytes.length);
+                
+                String origEntry = new String(origEntryBytes, charset);
+                String retrEntry = new String(retrEntryBytes, charset);
+                
+                LOG.debug("Original entry: " + origEntry);
+                LOG.debug("Retrieved entry: " + retrEntry);
+                
+                assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+            }
+            lh.close();
+        } catch (KeeperException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to ZooKeeper exception");
+        } catch (BKException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
+        } catch (InterruptedException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        } 
+        
+    }
     
     @Test
     public void testReadWriteSyncSingleClient() throws IOException {
-		try {
-			// Create a BookKeeper client and a ledger
-			bkc = new BookKeeper("127.0.0.1");
-			lh = bkc.createLedger(ledgerPassword);
-			//bkc.initMessageDigest("SHA1");
-			ledgerId = lh.getId();
-			LOG.info("Ledger ID: " + lh.getId());
-			for(int i = 0; i < numEntriesToWrite; i++){
-				ByteBuffer entry = ByteBuffer.allocate(4);
-				entry.putInt(rng.nextInt(maxInt));
-				entry.position(0);
-				entries.add(entry.array());				
-				lh.addEntry(entry.array());
-			}
-			lh.close();
-			lh = bkc.openLedger(ledgerId, ledgerPassword);
-			LOG.debug("Number of entries written: " + lh.getLast());
-			assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));		
-			
-			ls = lh.readEntries(0, numEntriesToWrite - 1);
-			int i = 0;
-			while(ls.hasMoreElements()){
-			    ByteBuffer origbb = ByteBuffer.wrap(entries.get(i++));
-				Integer origEntry = origbb.getInt();
-				ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
-				LOG.debug("Length of result: " + result.capacity());
-				LOG.debug("Original entry: " + origEntry);
+        try {
+            // Create a BookKeeper client and a ledger
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(ledgerPassword);
+            //bkc.initMessageDigest("SHA1");
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            for(int i = 0; i < numEntriesToWrite; i++){
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+                entries.add(entry.array());             
+                lh.addEntry(entry.array());
+            }
+            lh.close();
+            lh = bkc.openLedger(ledgerId, ledgerPassword);
+            LOG.debug("Number of entries written: " + lh.getLast());
+            assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));     
+            
+            ls = lh.readEntries(0, numEntriesToWrite - 1);
+            int i = 0;
+            while(ls.hasMoreElements()){
+                ByteBuffer origbb = ByteBuffer.wrap(entries.get(i++));
+                Integer origEntry = origbb.getInt();
+                ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
+                LOG.debug("Length of result: " + result.capacity());
+                LOG.debug("Original entry: " + origEntry);
 
-				Integer retrEntry = result.getInt();
-				LOG.debug("Retrieved entry: " + retrEntry);
-				assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
-			}
-			lh.close();
-		} catch (KeeperException e) {
-			e.printStackTrace();
-		} catch (BKException e) {
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		} //catch (NoSuchAlgorithmException e) {
-		//	e.printStackTrace();
-		//}
-	}
-	
+                Integer retrEntry = result.getInt();
+                LOG.debug("Retrieved entry: " + retrEntry);
+                assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+            }
+            lh.close();
+        } catch (KeeperException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to ZooKeeper exception");
+        } catch (BKException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
+        } catch (InterruptedException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        } 
+    }
+    
     @Test
     public void testReadWriteZero() throws IOException {
-		try {
-			// Create a BookKeeper client and a ledger
-			bkc = new BookKeeper("127.0.0.1");
-			lh = bkc.createLedger(ledgerPassword);
-			//bkc.initMessageDigest("SHA1");
-			ledgerId = lh.getId();
-			LOG.info("Ledger ID: " + lh.getId());
-			for(int i = 0; i < numEntriesToWrite; i++){				
-			lh.addEntry(new byte[0]);
-			}
-			
-			/*
-			 * Write a non-zero entry
-			 */
-			ByteBuffer entry = ByteBuffer.allocate(4);
-			entry.putInt(rng.nextInt(maxInt));
-			entry.position(0);
-			entries.add(entry.array());				
-			lh.addEntry( entry.array());
-			
-			lh.close();
-			lh = bkc.openLedger(ledgerId, ledgerPassword);
-			LOG.debug("Number of entries written: " + lh.getLast());
-			assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);		
-			
-			ls = lh.readEntries(0, numEntriesToWrite - 1);
-			int i = 0;
-			while(ls.hasMoreElements()){
-				ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
-				LOG.debug("Length of result: " + result.capacity());
-				
-				assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
-			}
-			lh.close();
-		} catch (KeeperException e) {
-			e.printStackTrace();
-		} catch (BKException e) {
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		} //catch (NoSuchAlgorithmException e) {
-		//	e.printStackTrace();
-		//}
-	}
+        try {
+            // Create a BookKeeper client and a ledger
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(ledgerPassword);
+            //bkc.initMessageDigest("SHA1");
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            for(int i = 0; i < numEntriesToWrite; i++){             
+            lh.addEntry(new byte[0]);
+            }
+            
+            /*
+             * Write a non-zero entry
+             */
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            entries.add(entry.array());             
+            lh.addEntry( entry.array());
+            
+            lh.close();
+            lh = bkc.openLedger(ledgerId, ledgerPassword);
+            LOG.debug("Number of entries written: " + lh.getLast());
+            assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);       
+            
+            ls = lh.readEntries(0, numEntriesToWrite - 1);
+            int i = 0;
+            while(ls.hasMoreElements()){
+                ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
+                LOG.debug("Length of result: " + result.capacity());
+                
+                assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
+            }
+            lh.close();
+        } catch (KeeperException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to ZooKeeper exception");
+        } catch (BKException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
+        } catch (InterruptedException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        } 
+    }
     
     @Test
     public void testMultiLedger() throws IOException {
@@ -473,159 +476,160 @@ public class BookieReadWriteTest
                 
                 assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
             }
-            
             lh2.close();
         } catch (KeeperException e) {
-            e.printStackTrace();
+            LOG.error("Test failed", e);
+            fail("Test failed due to ZooKeeper exception");
         } catch (BKException e) {
-            e.printStackTrace();
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
         } catch (InterruptedException e) {
-            e.printStackTrace();
-        } //catch (NoSuchAlgorithmException e) {
-           // e.printStackTrace();
-        //}
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        } 
     }
     
     
-	public void addComplete(int rc, 
-	        LedgerHandle lh, 
-	        long entryId, 
-	        Object ctx) {
-		SyncObj x = (SyncObj) ctx;
-		synchronized (x) {
-			x.counter++;
-			x.notify();
-		}
-	}
+    public void addComplete(int rc, 
+            LedgerHandle lh, 
+            long entryId, 
+            Object ctx) {
+        SyncObj x = (SyncObj) ctx;
+        synchronized (x) {
+            x.counter++;
+            x.notify();
+        }
+    }
 
-	public void readComplete(int rc, 
-	        LedgerHandle lh, 
-	        LedgerSequence seq,
-			Object ctx) {
-		ls = seq;				
-		synchronized (sync) {
-			sync.value = true;
-			sync.notify();
-		}
-		
-	}
-	 
-	protected void setUp() throws IOException {
-		LOG.addAppender(ca);
-		LOG.setLevel((Level) Level.DEBUG);
-		
-		// create a ZooKeeper server(dataDir, dataLogDir, port)
-		LOG.debug("Running ZK server");
-		//ServerStats.registerAsConcrete();
-		ClientBase.setupTestEnv();
-		ZkTmpDir = File.createTempFile("zookeeper", "test");
+    public void readComplete(int rc, 
+            LedgerHandle lh, 
+            LedgerSequence seq,
+            Object ctx) {
+        ls = seq;               
+        synchronized (sync) {
+            sync.value = true;
+            sync.notify();
+        }
+        
+    }
+     
+    protected void setUp() throws IOException, InterruptedException {
+        LOG.addAppender(ca);
+        LOG.setLevel((Level) Level.DEBUG);
+        
+        // create a ZooKeeper server(dataDir, dataLogDir, port)
+        LOG.debug("Running ZK server");
+        //ServerStats.registerAsConcrete();
+        ClientBase.setupTestEnv();
+        ZkTmpDir = File.createTempFile("zookeeper", "test");
         ZkTmpDir.delete();
         ZkTmpDir.mkdir();
-		    
-		try {
-			zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
-			serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
-			serverFactory.startup(zks);
-		} catch (IOException e1) {
-			// TODO Auto-generated catch block
-			e1.printStackTrace();
-		} catch (InterruptedException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
+            
+        try {
+            zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+            serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+            serverFactory.startup(zks);
+        } catch (IOException e1) {
+            // TODO Auto-generated catch block
+            e1.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
         boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
-		
+        
         LOG.debug("Server up: " + b);
         
-		// create a zookeeper client
-		LOG.debug("Instantiate ZK Client");
-		zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
-		
-		//initialize the zk client with values
-		try {
-			zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-			zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-			zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-			zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-			zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-		} catch (KeeperException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-		// Create Bookie Servers (B1, B2, B3)
-		tmpDirB1 = File.createTempFile("bookie1", "test");
+        // create a zookeeper client
+        LOG.debug("Instantiate ZK Client");
+        zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+        
+        //initialize the zk client with values
+        try {
+            zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        
+        // Create Bookie Servers (B1, B2, B3)
+        tmpDirB1 = File.createTempFile("bookie1", "test");
         tmpDirB1.delete();
         tmpDirB1.mkdir();
-		 
-		bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
-		bs1.start();
-		
-		tmpDirB2 = File.createTempFile("bookie2", "test");
+         
+        bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
+        bs1.start();
+        
+        tmpDirB2 = File.createTempFile("bookie2", "test");
         tmpDirB2.delete();
         tmpDirB2.mkdir();
-		    
-		bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
-		bs2.start();
+            
+        bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
+        bs2.start();
 
-		tmpDirB3 = File.createTempFile("bookie3", "test");
+        tmpDirB3 = File.createTempFile("bookie3", "test");
         tmpDirB3.delete();
         tmpDirB3.mkdir();
         
-		bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
-		bs3.start();
-		
-		rng = new Random(System.currentTimeMillis());	// Initialize the Random Number Generator 
-		entries = new ArrayList<byte[]>(); // initialize the  entries list
-		entriesSize = new ArrayList<Integer>(); 
-		sync = new SyncObj(); // initialize the synchronization data structure
-	}
-	
-	protected void tearDown(){
-		LOG.info("TearDown");
+        bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
+        bs3.start();
+        
+        rng = new Random(System.currentTimeMillis());   // Initialize the Random Number Generator 
+        entries = new ArrayList<byte[]>(); // initialize the  entries list
+        entriesSize = new ArrayList<Integer>(); 
+        sync = new SyncObj(); // initialize the synchronization data structure
+        zkc.close();
+    }
+    
+    protected void tearDown(){
+        LOG.info("TearDown");
 
-		//shutdown bookie servers 
-		try {
-			bs1.shutdown();
-			bs2.shutdown();
-			bs3.shutdown();
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		}
-		cleanUpDir(tmpDirB1);
-		cleanUpDir(tmpDirB2);
-		cleanUpDir(tmpDirB3);
-		
-		//shutdown ZK server
-		serverFactory.shutdown();
-		assertTrue("waiting for server down",
+        //shutdown bookie servers 
+        try {
+            bs1.shutdown();
+            bs2.shutdown();
+            bs3.shutdown();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        cleanUpDir(tmpDirB1);
+        cleanUpDir(tmpDirB2);
+        cleanUpDir(tmpDirB3);
+        
+        //shutdown ZK server
+        serverFactory.shutdown();
+        assertTrue("waiting for server down",
                 ClientBase.waitForServerDown(HOSTPORT,
                                              ClientBase.CONNECTION_TIMEOUT));
-		//ServerStats.unregister();
-		cleanUpDir(ZkTmpDir);
-		
-	}
+        //ServerStats.unregister();
+        cleanUpDir(ZkTmpDir);
+        
+    }
 
-	/*	Clean up a directory recursively */
-	protected boolean cleanUpDir(File dir){
-		if (dir.isDirectory()) {
-			LOG.info("Cleaning up " + dir.getName());
+    /*  Clean up a directory recursively */
+    protected boolean cleanUpDir(File dir){
+        if (dir.isDirectory()) {
+            LOG.info("Cleaning up " + dir.getName());
             String[] children = dir.list();
             for (String string : children) {
-				boolean success = cleanUpDir(new File(dir, string));
-				if (!success) return false;
-			}
+                boolean success = cleanUpDir(new File(dir, string));
+                if (!success) return false;
+            }
         }
         // The directory is now empty so delete it
-        return dir.delete();		
-	}
+        return dir.delete();        
+    }
 
-	/*	User for testing purposes, void */
-	class emptyWatcher implements Watcher{
-		public void process(WatchedEvent event) {}
-	}
+    /*  User for testing purposes, void */
+    class emptyWatcher implements Watcher{
+        public void process(WatchedEvent event) {}
+    }
 
 }

+ 10 - 16
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java

@@ -141,6 +141,7 @@ implements Watcher {
             zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
             zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
             zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
+            zk.close();
         } catch (KeeperException ke) {
             LOG.error(ke);
             fail("Couldn't execute ZooKeeper start procedure");
@@ -230,6 +231,9 @@ implements Watcher {
         } catch(InterruptedException e){
             LOG.error("Interrupted when adding entry", e);
             fail("Couldn't finish adding entries");
+        } catch(BKException e){
+            LOG.error("BookKeeper exception", e);
+            fail("BookKeeper exception when adding entries");
         }
         
         try{
@@ -249,6 +253,9 @@ implements Watcher {
         } catch(InterruptedException e){
             LOG.error("Interrupted when adding entry", e);
             fail("Couldn't finish adding entries");
+        } catch(BKException e){
+            LOG.error("BookKeeper exception", e);
+            fail("CBookKeeper exception while adding entries");
         }
         
         try{
@@ -270,6 +277,9 @@ implements Watcher {
         } catch(InterruptedException e){
             LOG.error("Interrupted when adding entry", e);
             fail("Couldn't finish adding entries");
+        } catch(BKException e){
+            LOG.error("BookKeeper exception", e);
+            fail("BookKeeper exception when adding entries");
         }
         
         try{
@@ -279,22 +289,6 @@ implements Watcher {
             LOG.error(e);
             fail("Exception while closing ledger 4");
         }
-        /*
-            LedgerHandle afterlh = bk.openLedger(beforelh.getId(), "".getBytes());
-            
-        } catch (KeeperException e) {
-            LOG.error("Error when opening ledger", e);
-            fail("Couldn't open ledger");
-        } /* catch (InterruptedException ie) {
-            LOG.error("Interrupted exception", ie);
-            fail("Failure due to interrupted exception");
-        } catch (IOException ioe) {
-            LOG.error("IO Exception", ioe);
-            fail("Failure due to IO exception");
-        } catch (BKException bke){
-            LOG.error("BookKeeper error", bke);
-            fail("BookKeeper error");
-        }*/
     }      
 }
     

+ 9 - 19
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java

@@ -140,6 +140,7 @@ implements Watcher {
             zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
             zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
             zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
+            zk.close();
         } catch (KeeperException ke) {
             LOG.error(ke);
             fail("Couldn't execute ZooKeeper start procedure");
@@ -219,21 +220,16 @@ implements Watcher {
             for(int i = 0; i < 1000; i++){
                 beforelh.addEntry(tmp.getBytes());
             }
+            
+            //bk.resetLedger(beforelh);
         } catch(InterruptedException e){
             LOG.error("Interrupted when adding entry", e);
             fail("Couldn't finish adding entries");
+        } catch(BKException e){
+            LOG.error("BookKeeper exception", e);
+            fail("BookKeeper exception while adding entries");
         }
         
-        ///*
-        // * Sleep.
-        // */
-        //try{
-        //    Thread.sleep(2000);
-        //} catch(InterruptedException e){
-        //    LOG.error("Interrupted while sleeping", e);
-        //    fail("Couldn't finish sleeping");
-        //}
-        
         /*
          * Try to open ledger.
          */
@@ -307,17 +303,11 @@ implements Watcher {
         } catch(InterruptedException e){
             LOG.error("Interrupted when adding entry", e);
             fail("Couldn't finish adding entries");
+        } catch(BKException e){
+            LOG.error("BookKeeper exception", e);
+            fail("BookKeeper exception while adding entries");
         }
         
-        ///*
-        // * Sleep.
-        // */
-        //try{
-        //    Thread.sleep(2000);
-        //} catch(InterruptedException e){
-        //    LOG.error("Interrupted while sleeping", e);
-        //    fail("Couldn't finish sleeping");
-        //}
         
         /*
          * Try to open ledger.