Browse Source

ZOOKEEPER-596. The last logged zxid calculated by zookeeper servers could cause problems in leader election if data gets corrupted. (mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@892111 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 15 năm trước cách đây
mục cha
commit
3ba0a5450c
21 tập tin đã thay đổi với 761 bổ sung205 xóa
  1. 3 0
      CHANGES.txt
  2. 2 2
      src/java/main/org/apache/zookeeper/server/ConnectionBean.java
  3. 16 15
      src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
  4. 4 4
      src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
  5. 4 4
      src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
  6. 3 3
      src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
  7. 454 0
      src/java/main/org/apache/zookeeper/server/ZKDatabase.java
  8. 63 111
      src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
  9. 1 1
      src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java
  10. 3 2
      src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
  11. 1 1
      src/java/main/org/apache/zookeeper/server/quorum/Leader.java
  12. 6 5
      src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
  13. 5 5
      src/java/main/org/apache/zookeeper/server/quorum/Learner.java
  14. 12 10
      src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
  15. 5 12
      src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
  16. 3 2
      src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
  17. 36 22
      src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
  18. 2 0
      src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
  19. 4 4
      src/java/test/org/apache/zookeeper/test/ACLTest.java
  20. 2 2
      src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java
  21. 132 0
      src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java

+ 3 - 0
CHANGES.txt

@@ -187,6 +187,9 @@ BUGFIXES:
   ZOOKEEPER-600. TODO pondering about allocation behavior in zkpython may be
   removed (gustavo via mahadev)
 
+  ZOOKEEPER-596. The last logged zxid calculated by zookeeper servers could
+  cause problems in leader election if data gets corrupted. (mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
   "socket reuse" and failure to close client (phunt via mahadev)

+ 2 - 2
src/java/main/org/apache/zookeeper/server/ConnectionBean.java

@@ -73,8 +73,8 @@ public class ConnectionBean implements ConnectionMXBean, ZKMBeanInfo {
     }
     
     public String[] getEphemeralNodes() {
-        if(zk.dataTree!=null){
-            String[] res=zk.dataTree.getEphemerals(connection.getSessionId())
+        if(zk.getZKDatabase()  !=null){
+            String[] res= zk.getZKDatabase().getEphemerals(connection.getSessionId())
                 .toArray(new String[0]);
             Arrays.sort(res);
             return res;

+ 16 - 15
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -99,7 +99,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 }
             }
             if (request.hdr != null) {
-                rc = zks.dataTree.processTxn(request.hdr, request.txn);
+                rc = zks.getZKDatabase().processTxn(request.hdr, request.txn);
                 if (request.type == OpCode.createSession) {
                     if (request.txn instanceof CreateSessionTxn) {
                         CreateSessionTxn cst = (CreateSessionTxn) request.txn;
@@ -116,7 +116,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             // do not add non quorum packets to the queue.
             if (Request.isQuorum(request.type)) {
-                zks.addCommittedProposal(request);
+                zks.getZKDatabase().addCommittedProposal(request);
             }
         }
 
@@ -168,7 +168,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                         request.createTime, System.currentTimeMillis());
 
                 cnxn.sendResponse(new ReplyHeader(-2,
-                        zks.dataTree.lastProcessedZxid, 0), null, "response");
+                        zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
                 return;
             }
             case OpCode.createSession: {
@@ -229,7 +229,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 if (path.indexOf('\0') != -1) {
                     throw new KeeperException.BadArgumentsException();
                 }
-                Stat stat = zks.dataTree.statNode(path, existsRequest
+                Stat stat = zks.getZKDatabase().statNode(path, existsRequest
                         .getWatch() ? cnxn : null);
                 rsp = new ExistsResponse(stat);
                 break;
@@ -239,7 +239,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 GetDataRequest getDataRequest = new GetDataRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         getDataRequest);
-                DataNode n = zks.dataTree.getNode(getDataRequest.getPath());
+                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
@@ -247,11 +247,11 @@ public class FinalRequestProcessor implements RequestProcessor {
                 synchronized(n) {
                     aclL = n.acl;
                 }
-                PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclL),
+                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                         ZooDefs.Perms.READ,
                         request.authInfo);
                 Stat stat = new Stat();
-                byte b[] = zks.dataTree.getData(getDataRequest.getPath(), stat,
+                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                         getDataRequest.getWatch() ? cnxn : null);
                 rsp = new GetDataResponse(b, stat);
                 break;
@@ -263,7 +263,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 request.request.rewind();
                 ZooKeeperServer.byteBuffer2Record(request.request, setWatches);
                 long relativeZxid = setWatches.getRelativeZxid();
-                zks.dataTree.setWatches(relativeZxid, 
+                zks.getZKDatabase().setWatches(relativeZxid, 
                         setWatches.getDataWatches(), 
                         setWatches.getExistWatches(),
                         setWatches.getChildWatches(), cnxn);
@@ -276,7 +276,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                         getACLRequest);
                 Stat stat = new Stat();
                 List<ACL> acl = 
-                    zks.dataTree.getACL(getACLRequest.getPath(), stat);
+                    zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
                 rsp = new GetACLResponse(acl, stat);
                 break;
             }
@@ -285,18 +285,19 @@ public class FinalRequestProcessor implements RequestProcessor {
                 GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         getChildrenRequest);
-                DataNode n = zks.dataTree.getNode(getChildrenRequest.getPath());
+                DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
                 Long aclG;
                 synchronized(n) {
                     aclG = n.acl;
+                    
                 }
-                PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclG), 
+                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), 
                         ZooDefs.Perms.READ,
                         request.authInfo);
-                List<String> children = zks.dataTree.getChildren(
+                List<String> children = zks.getZKDatabase().getChildren(
                         getChildrenRequest.getPath(), null, getChildrenRequest
                                 .getWatch() ? cnxn : null);
                 rsp = new GetChildrenResponse(children);
@@ -308,7 +309,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         getChildren2Request);
                 Stat stat = new Stat();
-                DataNode n = zks.dataTree.getNode(getChildren2Request.getPath());
+                DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
@@ -316,10 +317,10 @@ public class FinalRequestProcessor implements RequestProcessor {
                 synchronized(n) {
                     aclG = n.acl;
                 }
-                PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclG), 
+                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), 
                         ZooDefs.Perms.READ,
                         request.authInfo);
-                List<String> children = zks.dataTree.getChildren(
+                List<String> children = zks.getZKDatabase().getChildren(
                         getChildren2Request.getPath(), stat, getChildren2Request
                                 .getWatch() ? cnxn : null);
                 rsp = new GetChildren2Response(children, stat);

+ 4 - 4
src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -687,13 +687,13 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
         if (zk == null) {
             throw new IOException("ZooKeeperServer not running");
         }
-        if (connReq.getLastZxidSeen() > zk.dataTree.lastProcessedZxid) {
+        if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
             String msg = "Refusing session request for client "
                 + sock.socket().getRemoteSocketAddress()
                 + " as it has seen zxid 0x"
                 + Long.toHexString(connReq.getLastZxidSeen())
                 + " our last zxid is 0x"
-                + Long.toHexString(zk.dataTree.lastProcessedZxid)
+                + Long.toHexString(zk.getZKDatabase().getDataTreeLastProcessedZxid())
                 + " client must try another server";
 
             LOG.info(msg);
@@ -800,7 +800,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                 sb.append("SessionTracker dump: \n");
                 sb.append(zk.sessionTracker.toString()).append("\n");
                 sb.append("ephemeral nodes dump:\n");
-                sb.append(zk.dataTree.dumpEphemerals()).append("\n");
+                sb.append(zk.getZKDatabase().dumpEphemerals()).append("\n");
                 sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
             }
             k.interestOps(SelectionKey.OP_WRITE);
@@ -825,7 +825,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                     sb.append("\n");
                 }
                 sb.append(zk.serverStats().toString());
-                sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
+                sb.append("Node count: ").append(zk.getZKDatabase().getNodeCount()).
                     append("\n");
             } else {
                 sb.append("ZooKeeperServer not running\n");

+ 4 - 4
src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -132,7 +132,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
             }
             */
             if (lastChange == null) {
-                DataNode n = zks.dataTree.getNode(path);
+                DataNode n = zks.getZKDatabase().getNode(path);
                 if (n != null) {
                     Long acl;
                     Set<String> children;
@@ -142,7 +142,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                     }
                     lastChange = new ChangeRecord(-1, path, n.stat, 
                         children != null ? children.size() : 0, 
-                            zks.dataTree.convertLong(acl));
+                            zks.getZKDatabase().convertLong(acl));
                 }
             }
         }
@@ -278,7 +278,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 path = deleteRequest.getPath();
                 lastSlash = path.lastIndexOf('/');
                 if (lastSlash == -1 || path.indexOf('\0') != -1
-                        || zks.dataTree.isSpecialPath(path)) {
+                        || zks.getZKDatabase().isSpecialPath(path)) {
                     throw new KeeperException.BadArgumentsException(path);
                 }
                 parentPath = path.substring(0, lastSlash);
@@ -366,7 +366,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 // queues up this operation without being the session owner.
                 // this request is the last of the session so it should be ok
                 //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                HashSet<String> es = zks.dataTree
+                HashSet<String> es = zks.getZKDatabase()
                         .getEphemerals(request.sessionId);
                 synchronized (zks.outstandingChanges) {
                     for (ChangeRecord c : zks.outstandingChanges) {

+ 3 - 3
src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java

@@ -104,12 +104,12 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
                 }
                 if (si != null) {
                     // track the number of records written to the log
-                    if (zks.getLogWriter().append(si)) {
+                    if (zks.getZKDatabase().append(si)) {
                         logCount++;
                         if (logCount > (snapCount / 2 + randRoll)) {
                             randRoll = r.nextInt(snapCount/2);
                             // roll the log
-                            zks.getLogWriter().rollLog();
+                            zks.getZKDatabase().rollLog();
                             // take a snapshot
                             if (snapInProcess != null && snapInProcess.isAlive()) {
                                 LOG.warn("Too busy to snap, skipping");
@@ -155,7 +155,7 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
         if (toFlush.isEmpty())
             return;
 
-        zks.getLogWriter().commit();
+        zks.getZKDatabase().commit();
         while (!toFlush.isEmpty()) {
             Request i = toFlush.remove();
             nextProcessor.processRequest(i);

+ 454 - 0
src/java/main/org/apache/zookeeper/server/ZKDatabase.java

@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * This class maintains the in memory database of zookeeper
+ * server states that includes the sessions, datatree and the
+ * committed logs. It is booted up  after reading the logs
+ * and snapshots from the disk.
+ */
+public class ZKDatabase {
+    
+    private static final Logger LOG = Logger.getLogger(ZKDatabase.class);
+    
+    /**
+     * make sure on a clear you take care of 
+     * all these members.
+     */
+    protected DataTree dataTree;
+    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+    protected FileTxnSnapLog snapLog;
+    protected long minCommittedLog, maxCommittedLog;
+    public static final int commitLogCount = 500;
+    protected static int commitLogBuffer = 700;
+    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
+    volatile private boolean initialized = false;
+    
+    /**
+     * the filetxnsnaplog that this zk database
+     * maps to. There is a one to one relationship
+     * between a filetxnsnaplog and zkdatabase.
+     * @param snapLog the FileTxnSnapLog mapping this zkdatabase
+     */
+    public ZKDatabase(FileTxnSnapLog snapLog) {
+        dataTree = new DataTree();
+        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
+        this.snapLog = snapLog;
+    }
+    
+    /**
+     * checks to see if the zk database has been
+     * initialized or not.
+     * @return true if zk database is initialized and false if not
+     */
+    public boolean isInitialized() {
+        return initialized;
+    }
+    
+    /**
+     * clear the zkdatabase. 
+     * Note to developers - be careful to see that 
+     * the clear method does clear out all the
+     * data structures in zkdatabase.
+     */
+    public void clear() {
+        minCommittedLog = 0;
+        maxCommittedLog = 0;
+        /* to be safe we just create a new 
+         * datatree.
+         */
+        dataTree = new DataTree();
+        sessionsWithTimeouts.clear();
+        committedLog.clear();
+        initialized = false;
+    }
+    
+    /**
+     * the datatree for this zkdatabase
+     * @return the datatree for this zkdatabase
+     */
+    public DataTree getDataTree() {
+        return this.dataTree;
+    }
+ 
+    /**
+     * the committed log for this zk database
+     * @return the committed log for this zkdatabase
+     */
+    public long getmaxCommittedLog() {
+        return maxCommittedLog;
+    }
+    
+    
+    /**
+     * the minimum committed transaction log
+     * available in memory
+     * @return the minimum committed transaction
+     * log available in memory
+     */
+    public long getminCommittedLog() {
+        return minCommittedLog;
+    }
+    
+    public LinkedList<Proposal> getCommittedLog() {
+        return this.committedLog;
+    }
+    
+    /**
+     * get the last processed zxid from a datatree
+     * @return the last processed zxid of a datatree
+     */
+    public long getDataTreeLastProcessedZxid() {
+        return dataTree.lastProcessedZxid;
+    }
+    
+    /**
+     * set the datatree initialized or not
+     * @param b set the datatree initialized to b
+     */
+    public void setDataTreeInit(boolean b) {
+        dataTree.initialized = b;
+    }
+    
+    /**
+     * return the sessions in the datatree
+     * @return the data tree sessions
+     */
+    public Collection<Long> getSessions() {
+        return dataTree.getSessions();
+    }
+    
+    /**
+     * get sessions with timeouts
+     * @return the hashmap of sessions with timeouts
+     */
+    public ConcurrentHashMap<Long, Integer> getSessionWithTimeOuts() {
+        return sessionsWithTimeouts;
+    }
+
+    
+    /**
+     * load the database from the disk onto memory and also add 
+     * the transactions to the committedlog in memory.
+     * @return the last valid zxid on disk
+     * @throws IOException
+     */
+    public long loadDataBase() throws IOException {
+        PlayBackListener listener=new PlayBackListener(){
+            public void onTxnLoaded(TxnHeader hdr,Record txn){
+                Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
+                        null, null);
+                r.txn = txn;
+                r.hdr = hdr;
+                r.zxid = hdr.getZxid();
+                addCommittedProposal(r);
+            }
+        };
+        
+        long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
+        initialized = true;
+        return zxid;
+    }
+    
+    /**
+     * maintains a list of last <i>committedLog</i>
+     *  or so committed requests. This is used for
+     * fast follower synchronization.
+     * @param request committed request
+     */
+    public void addCommittedProposal(Request request) {
+        synchronized (committedLog) {
+            if (committedLog.size() > commitLogCount) {
+                committedLog.removeFirst();
+                minCommittedLog = committedLog.getFirst().packet.getZxid();
+            }
+            if (committedLog.size() == 0) {
+                minCommittedLog = request.zxid;
+                maxCommittedLog = request.zxid;
+            }
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+            try {
+                request.hdr.serialize(boa, "hdr");
+                if (request.txn != null) {
+                    request.txn.serialize(boa, "txn");
+                }
+                baos.close();
+            } catch (IOException e) {
+                LOG.error("This really should be impossible", e);
+            }
+            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
+                    baos.toByteArray(), null);
+            Proposal p = new Proposal();
+            p.packet = pp;
+            p.request = request;
+            committedLog.add(p);
+            maxCommittedLog = p.packet.getZxid();
+        }
+    }
+
+    
+    /**
+     * remove a cnxn from the datatree
+     * @param cnxn the cnxn to remove from the datatree
+     */
+    public void removeCnxn(ServerCnxn cnxn) {
+        dataTree.removeCnxn(cnxn);
+    }
+
+    /**
+     * kill a given session in the datatree
+     * @param sessionId the session id to be killed
+     * @param zxid the zxid of kill session transaction
+     */
+    public void killSession(long sessionId, long zxid) {
+        dataTree.killSession(sessionId, zxid);
+    }
+
+    /**
+     * get a string dump of all the ephemerals in
+     * the datatree
+     * @return the string dump of ephemerals
+     */
+    public String dumpEphemerals() {
+        return dataTree.dumpEphemerals();
+    }
+
+    /**
+     * the node count of the datatree
+     * @return the node count of datatree
+     */
+    public int getNodeCount() {
+        return dataTree.getNodeCount();
+    }
+
+    /**
+     * the paths for  ephemeral session id 
+     * @param sessionId the session id for which paths match to 
+     * @return the paths for a session id
+     */
+    public HashSet<String> getEphemerals(long sessionId) {
+        return dataTree.getEphemerals(sessionId);
+    }
+
+    /**
+     * the last processed zxid in the datatree
+     * @param zxid the last processed zxid in the datatree
+     */
+    public void setlastProcessedZxid(long zxid) {
+        dataTree.lastProcessedZxid = zxid;
+    }
+
+    /**
+     * the process txn on the data
+     * @param hdr the txnheader for the txn
+     * @param txn the transaction that needs to be processed
+     * @return the result of processing the transaction on this
+     * datatree/zkdatabase
+     */
+    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+        return dataTree.processTxn(hdr, txn);
+    }
+
+    /**
+     * stat the path 
+     * @param path the path for which stat is to be done
+     * @param serverCnxn the servercnxn attached to this request
+     * @return the stat of this node
+     * @throws KeeperException.NoNodeException
+     */
+    public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
+        return dataTree.statNode(path, serverCnxn);
+    }
+    
+    /**
+     * get the datanode for this path
+     * @param path the path to lookup
+     * @return the datanode for getting the path
+     */
+    public DataNode getNode(String path) {
+      return dataTree.getNode(path);
+    }
+
+    /**
+     * convert from long to the acl entry
+     * @param aclL the long for which to get the acl
+     * @return the acl corresponding to this long entry
+     */
+    public List<ACL> convertLong(Long aclL) {
+        return dataTree.convertLong(aclL);
+    }
+
+    /**
+     * get data and stat for a path 
+     * @param path the path being queried
+     * @param stat the stat for this path
+     * @param watcher the watcher function
+     * @return
+     * @throws KeeperException.NoNodeException
+     */
+    public byte[] getData(String path, Stat stat, Watcher watcher) 
+    throws KeeperException.NoNodeException {
+        return dataTree.getData(path, stat, watcher);
+    }
+
+    /**
+     * set watches on the datatree
+     * @param relativeZxid the relative zxid that client has seen
+     * @param dataWatches the data watches the client wants to reset
+     * @param existWatches the exists watches the client wants to reset
+     * @param childWatches the child watches the client wants to reset
+     * @param watcher the watcher function
+     */
+    public void setWatches(long relativeZxid, List<String> dataWatches,
+            List<String> existWatches, List<String> childWatches, Watcher watcher) {
+        dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, watcher);
+    }
+    
+    /**
+     * get acl for a path
+     * @param path the path to query for acl
+     * @param stat the stat for the node
+     * @return the acl list for this path
+     * @throws NoNodeException
+     */
+    public List<ACL> getACL(String path, Stat stat) throws NoNodeException {
+        return dataTree.getACL(path, stat);
+    }
+
+    /**
+     * get children list for this path
+     * @param path the path of the node
+     * @param stat the stat of the node
+     * @param watcher the watcher function for this path
+     * @return the list of children for this path
+     * @throws KeeperException.NoNodeException
+     */
+    public List<String> getChildren(String path, Stat stat, Watcher watcher)
+    throws KeeperException.NoNodeException {
+        return dataTree.getChildren(path, stat, watcher);
+    }
+
+    /**
+     * check if the path is special or not
+     * @param path the input path
+     * @return true if path is special and false if not
+     */
+    public boolean isSpecialPath(String path) {
+        return dataTree.isSpecialPath(path);
+    }
+
+    /**
+     * get the acl size of the datatree
+     * @return the acl size of the datatree
+     */
+    public int getAclSize() {
+        return dataTree.longKeyMap.size();
+    }
+
+    /**
+     * truncate the zkdatabase to this zxid
+     * @param zxid the zxid to truncate zk database to
+     * @return true if the truncate is succesful and false if not
+     * @throws IOException
+     */
+    public boolean truncateLog(long zxid) throws IOException {
+        clear();
+        boolean truncated = this.snapLog.truncateLog(zxid);
+        loadDataBase();
+        return truncated;
+    }
+    
+    /**
+     * deserialize a snapshot from an input archive 
+     * @param ia the input archive you want to deserialize from
+     * @throws IOException
+     */
+    public void deserializeSnapshot(InputArchive ia) throws IOException {
+        clear();
+        SerializeUtils.deserializeSnapshot(getDataTree(),ia,getSessionWithTimeOuts());
+        initialized = true;
+    }   
+    
+    /**
+     * serialize the snapshot
+     * @param oa the output archive to which the snapshot needs to be serialized
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void serializeSnapshot(OutputArchive oa) throws IOException,
+    InterruptedException {
+        SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
+    }
+
+    /**
+     * append to the underlying transaction log 
+     * @param si the request to append
+     * @return true if the append was succesfull and false if not
+     */
+    public boolean append(Request si) throws IOException {
+        return this.snapLog.append(si);
+    }
+
+    /**
+     * roll the underlying log
+     */
+    public void rollLog() throws IOException {
+        this.snapLog.rollLog();
+    }
+
+    /**
+     * commit to the underlying transaction log
+     * @throws IOException
+     */
+    public void commit() throws IOException {
+        this.snapLog.commit();
+    }
+    
+    
+}

+ 63 - 111
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -96,16 +97,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
     public static final int DEFAULT_TICK_TIME = 3000;
     protected int tickTime = DEFAULT_TICK_TIME;
-
-    public static final int commitLogCount = 500;
-    public int commitLogBuffer = 700;
-    public final LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
-    public long minCommittedLog, maxCommittedLog;
-    private DataTreeBuilder treeBuilder;
-    public DataTree dataTree;
     protected SessionTracker sessionTracker;
     private FileTxnSnapLog txnLogFactory = null;
-    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+    private ZKDatabase zkDb;
     protected long hzxid = 0;
     public final static Exception ok = new Exception("No prob");
     protected RequestProcessor firstProcessor;
@@ -128,7 +123,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     private final ServerStats serverStats;
 
     void removeCnxn(ServerCnxn cnxn) {
-        dataTree.removeCnxn(cnxn);
+        zkDb.removeCnxn(cnxn);
     }
  
     /**
@@ -140,27 +135,38 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      */
     public ZooKeeperServer() {
         serverStats = new ServerStats(this);
-        treeBuilder = new BasicDataTreeBuilder();
     }
     
     /**
      * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
      * actually start listening for clients until run() is invoked.
-     *
+     * 
      * @param dataDir the directory to put the data
      * @throws IOException
      */
-    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
-            DataTreeBuilder treeBuilder) throws IOException {
+    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, 
+            DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
         serverStats = new ServerStats(this);
-        this.treeBuilder = treeBuilder;
-
         this.txnLogFactory = txnLogFactory;
+        this.zkDb = zkDb;
         this.tickTime = tickTime;
         
-        LOG.info("Created server");
+        LOG.info("Created server with tickTime " + tickTime + " datadir " + 
+                txnLogFactory.getDataDir() + " snapdir " + txnLogFactory.getSnapDir());
     }
 
+    /**
+     * creates a zookeeperserver instance. 
+     * @param txnLogFactory the file transaction snapshot logging class
+     * @param tickTime the ticktime for the server
+     * @param treeBuilder the datatree builder
+     * @throws IOException
+     */
+    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
+            DataTreeBuilder treeBuilder) throws IOException {
+        this(txnLogFactory, tickTime, treeBuilder, new ZKDatabase(txnLogFactory));
+    }
+    
     public ServerStats serverStats() {
         return serverStats;
     }
@@ -172,7 +178,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      */
     public ZooKeeperServer(File snapDir, File logDir, int tickTime)
             throws IOException {
-        this(new FileTxnSnapLog(snapDir,logDir),
+        this( new FileTxnSnapLog(snapDir, logDir),
                 tickTime,new BasicDataTreeBuilder());
     }
 
@@ -182,84 +188,51 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * @throws IOException
      */
     public ZooKeeperServer(FileTxnSnapLog txnLogFactory,DataTreeBuilder treeBuilder) throws IOException {
-        this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder);
+        this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder, new ZKDatabase(txnLogFactory));
     }
 
+    /**
+     * get the zookeeper database for this server
+     * @return the zookeeper database for this server
+     */
+    public ZKDatabase getZKDatabase() {
+        return this.zkDb;
+    }
+    
+    /**
+     * set the zkdatabase for this zookeeper server
+     * @param zkDb
+     */
+    public void setZKDatabase(ZKDatabase zkDb) {
+       this.zkDb = zkDb;
+    }
+    
     /**
      *  Restore sessions and data
      */
     public void loadData() throws IOException, InterruptedException {
-        PlayBackListener listener=new PlayBackListener(){
-            public void onTxnLoaded(TxnHeader hdr,Record txn){
-                Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
-                        null, null);
-                r.txn = txn;
-                r.hdr = hdr;
-                r.zxid = hdr.getZxid();
-                addCommittedProposal(r);
-            }
-        };
-        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
-        dataTree = treeBuilder.build();
-        setZxid(txnLogFactory.restore(dataTree,sessionsWithTimeouts,listener));
+        zkDb.loadDataBase();
+        setZxid(zkDb.loadDataBase());
         // Clean up dead sessions
         LinkedList<Long> deadSessions = new LinkedList<Long>();
-        for (long session : dataTree.getSessions()) {
+        for (long session : zkDb.getSessions()) {
+            sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
             if (sessionsWithTimeouts.get(session) == null) {
                 deadSessions.add(session);
             }
         }
-        dataTree.initialized = true;
+        zkDb.setDataTreeInit(true);
         for (long session : deadSessions) {
             // XXX: Is lastProcessedZxid really the best thing to use?
-            killSession(session, dataTree.lastProcessedZxid);
+            killSession(session, zkDb.getDataTreeLastProcessedZxid());
         }
         // Make a clean snapshot
         takeSnapshot();
     }
 
-    /**
-     * maintains a list of last 500 or so committed requests. This is used for
-     * fast follower synchronization.
-     *
-     * @param request committed request
-     */
-
-    public void addCommittedProposal(Request request) {
-        synchronized (committedLog) {
-            if (committedLog.size() > commitLogCount) {
-                committedLog.removeFirst();
-                minCommittedLog = committedLog.getFirst().packet.getZxid();
-            }
-            if (committedLog.size() == 0) {
-                minCommittedLog = request.zxid;
-                maxCommittedLog = request.zxid;
-            }
-
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-            try {
-                request.hdr.serialize(boa, "hdr");
-                if (request.txn != null) {
-                    request.txn.serialize(boa, "txn");
-                }
-                baos.close();
-            } catch (IOException e) {
-                LOG.error("This really should be impossible", e);
-            }
-            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
-                    baos.toByteArray(), null);
-            Proposal p = new Proposal();
-            p.packet = pp;
-            p.request = request;
-            committedLog.add(p);
-            maxCommittedLog = p.packet.getZxid();
-        }
-    }
-
     public void takeSnapshot(){
         try {
-            txnLogFactory.save(dataTree, sessionsWithTimeouts);
+            txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
         } catch (IOException e) {
             LOG.fatal("Severe unrecoverable error, exiting", e);
             // This is a severe error that we cannot recover from,
@@ -268,18 +241,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
-    public void serializeSnapshot(OutputArchive oa) throws IOException,
-            InterruptedException {
-        SerializeUtils.serializeSnapshot(dataTree, oa, sessionsWithTimeouts);
-    }
-
-    public void deserializeSnapshot(InputArchive ia) throws IOException {
-        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
-        dataTree = treeBuilder.build();
-
-        SerializeUtils.deserializeSnapshot(dataTree,ia,sessionsWithTimeouts);
-    }
-
+  
     /**
      * This should be called from a synchronized block on this!
      */
@@ -312,7 +274,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     protected void killSession(long sessionId, long zxid) {
-        dataTree.killSession(sessionId, zxid);
+        zkDb.killSession(sessionId, zxid);
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                          "ZooKeeperServer --- killSession: 0x"
@@ -358,7 +320,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             MBeanRegistry.getInstance().register(jmxServerBean, null);
             
             try {
-                jmxDataTreeBean = new DataTreeBean(dataTree);
+                jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
                 MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
             } catch (Exception e) {
                 LOG.warn("Failed to register with JMX", e);
@@ -371,7 +333,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
     
     public void startup() throws IOException, InterruptedException {
-        if (dataTree == null) {
+        //check to see if zkDb is not null
+        if (zkDb == null) {
+            zkDb = new ZKDatabase(this.txnLogFactory);
+        }
+        if (!zkDb.isInitialized()) {
             loadData();
         }
         createSessionTracker();
@@ -395,7 +361,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     protected void createSessionTracker() {
-        sessionTracker = new SessionTrackerImpl(this, sessionsWithTimeouts,
+        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                 tickTime, 1);
         ((SessionTrackerImpl)sessionTracker).start();
     }
@@ -416,8 +382,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         if (firstProcessor != null) {
             firstProcessor.shutdown();
         }
-        if (dataTree != null) {
-            dataTree.clear();
+        if (zkDb != null) {
+            zkDb.clear();
         }
 
         unregisterJMX();
@@ -638,7 +604,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * datatree
      */
     public long getLastProcessedZxid() {
-        return dataTree.lastProcessedZxid;
+        return zkDb.getDataTreeLastProcessedZxid();
     }
 
     /**
@@ -658,17 +624,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * @throws IOException
      */
     public void truncateLog(long zxid) throws IOException {
-        this.txnLogFactory.truncateLog(zxid);
+        this.zkDb.truncateLog(zxid);
     }
-    
-    /**
-     * the snapshot and logwriter for this instance
-     * @return
-     */
-    public FileTxnSnapLog getLogWriter() {
-        return this.txnLogFactory;
-    }
-    
+       
     public int getTickTime() {
         return tickTime;
     }
@@ -677,14 +635,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         this.tickTime = tickTime;
     }
 
-    public DataTreeBuilder getTreeBuilder() {
-        return treeBuilder;
-    }
-
-    public void setTreeBuilder(DataTreeBuilder treeBuilder) {
-        this.treeBuilder = treeBuilder;
-    }
-    
     public int getClientPort() {
         return serverCnxnFactory != null ? serverCnxnFactory.ss.socket().getLocalPort() : -1;
     }
@@ -700,4 +650,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public String getState() {
         return "standalone";
     }
+
+    
 }

+ 1 - 1
src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java

@@ -61,7 +61,7 @@ public class FileSnap implements SnapShot {
     /**
      * deserialize a data tree from the most recent snapshot
      * @return the zxid of the snapshot
-     */
+     */ 
     public long deserialize(DataTree dt, Map<Long, Integer> sessions)
             throws IOException {
         // we run through 100 snapshots (not all of them)

+ 3 - 2
src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -28,6 +28,7 @@ import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.txn.TxnHeader;
 
@@ -56,8 +57,8 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
      * @throws IOException
      */
     FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
-            DataTreeBuilder treeBuilder) throws IOException {
-        super(logFactory, self.tickTime,treeBuilder);
+            DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+        super(logFactory, self.tickTime,treeBuilder, zkDb);
         this.self = self;        
         this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
     }

+ 1 - 1
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -279,7 +279,7 @@ public class Leader {
             long epoch = self.getLastLoggedZxid() >> 32L;
             epoch++;
             zk.setZxid(epoch << 32L);
-            zk.dataTree.lastProcessedZxid = zk.getZxid();
+            zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
             
             synchronized(this){
                 lastProposed = zk.getZxid();

+ 6 - 5
src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

@@ -28,6 +28,7 @@ import org.apache.zookeeper.server.PrepRequestProcessor;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.SessionTrackerImpl;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
@@ -40,7 +41,7 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
  */
 public class LeaderZooKeeperServer extends ZooKeeperServer {
     private QuorumPeer self;
-
+    
     CommitProcessor commitProcessor;
 
     /**
@@ -49,8 +50,8 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
      * @throws IOException
      */
     LeaderZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
-            DataTreeBuilder treeBuilder) throws IOException {
-        super(logFactory, self.tickTime,treeBuilder);
+            DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+        super(logFactory, self.tickTime,treeBuilder, zkDb);
         this.self = self;
     }
 
@@ -80,7 +81,7 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
     
     @Override
     protected void createSessionTracker() {
-        sessionTracker = new SessionTrackerImpl(this, sessionsWithTimeouts,
+        sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
                 tickTime, self.getId());
         ((SessionTrackerImpl)sessionTracker).start();
     }
@@ -94,7 +95,7 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
     protected void registerJMX() {
         // register with JMX
         try {
-            jmxDataTreeBean = new DataTreeBean(dataTree);
+            jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
             MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
         } catch (Exception e) {
             LOG.warn("Failed to register with JMX", e);

+ 5 - 5
src/java/main/org/apache/zookeeper/server/quorum/Learner.java

@@ -280,12 +280,13 @@ public class Learner {
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
                 LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));                
-                zk.loadData();
             }
             else if (qp.getType() == Leader.SNAP) {
                 LOG.info("Getting a snapshot from leader");
                 // The leader is going to dump the database
-                zk.deserializeSnapshot(leaderIs);
+                // clear our own database and read
+                zk.getZKDatabase().clear();
+                zk.getZKDatabase().deserializeSnapshot(leaderIs);
                 String signature = leaderIs.readString("signature");
                 if (!signature.equals("BenWasHere")) {
                     LOG.error("Missing signature. Got " + signature);
@@ -295,7 +296,7 @@ public class Learner {
                 //we need to truncate the log to the lastzxid of the leader
                 LOG.warn("Truncating log to get in sync with the leader 0x"
                         + Long.toHexString(qp.getZxid()));
-                boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
+                boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                 if (!truncated) {
                     // not able to truncate the log
                     LOG.fatal("Not able to truncate the log "
@@ -303,7 +304,6 @@ public class Learner {
                     System.exit(13);
                 }
 
-                zk.loadData();
             }
             else {
                 LOG.fatal("Got unexpected packet from leader "
@@ -311,7 +311,7 @@ public class Learner {
                 System.exit(13);
 
             }
-            zk.dataTree.lastProcessedZxid = newLeaderZxid;
+            zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid);
         }
         ack.setZxid(newLeaderZxid & ~0xffffffffL);
         writePacket(ack, true);

+ 12 - 10
src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -27,6 +27,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.BinaryInputArchive;
@@ -254,13 +255,14 @@ public class LearnerHandler extends Thread {
             /* we are sending the diff check if we have proposals in memory to be able to 
              * send a diff to the 
              */ 
-            synchronized(leader.zk.committedLog) {
-                if (leader.zk.committedLog.size() != 0) {
-                    if ((leader.zk.maxCommittedLog >= peerLastZxid)
-                            && (leader.zk.minCommittedLog <= peerLastZxid)) {
+            LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
+            synchronized(proposals) {
+                if (proposals.size() != 0) {
+                    if ((leader.zk.getZKDatabase().getmaxCommittedLog() >= peerLastZxid)
+                            && (leader.zk.getZKDatabase().getminCommittedLog() <= peerLastZxid)) {
                         packetToSend = Leader.DIFF;
-                        zxidToSend = leader.zk.maxCommittedLog;
-                        for (Proposal propose: leader.zk.committedLog) {
+                        zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog();
+                        for (Proposal propose: proposals) {
                             if (propose.packet.getZxid() > peerLastZxid) {
                                 queuePacket(propose.packet);
                                 QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
@@ -274,7 +276,7 @@ public class LearnerHandler extends Thread {
                 else {
                     logTxns = false;
                 }            
-						}
+            }
             
             //check if we decided to send a diff or we need to send a truncate
             // we avoid using epochs for truncating because epochs make things
@@ -282,11 +284,11 @@ public class LearnerHandler extends Thread {
             // only if we know that there is a committed zxid in the queue that
             // is less than the one the peer has we send a trunc else to make
             // things simple we just send sanpshot.
-            if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
+            if (logTxns && (peerLastZxid > leader.zk.getZKDatabase().getmaxCommittedLog())) {
                 // this is the only case that we are sure that
                 // we can ask the peer to truncate the log
                 packetToSend = Leader.TRUNC;
-                zxidToSend = leader.zk.maxCommittedLog;
+                zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog();
                 updates = zxidToSend;
             }
             
@@ -318,7 +320,7 @@ public class LearnerHandler extends Thread {
                         + " zxid of leader is 0x"
                         + Long.toHexString(leaderLastZxid));
                 // Dump data to peer
-                leader.zk.serializeSnapshot(oa);
+                leader.zk.getZKDatabase().serializeSnapshot(oa);
                 oa.writeString("BenWasHere", "signature");
             }
             bufferedOutput.flush();

+ 5 - 12
src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java

@@ -24,6 +24,7 @@ import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerBean;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -36,8 +37,8 @@ public abstract class LearnerZooKeeperServer extends ZooKeeperServer {
     protected QuorumPeer self;
     
     public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
-            DataTreeBuilder treeBuilder) throws IOException {
-        super(logFactory,tickTime,treeBuilder);
+            DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+        super(logFactory,tickTime,treeBuilder, zkDb);
     }
 
     /**
@@ -68,17 +69,9 @@ public abstract class LearnerZooKeeperServer extends ZooKeeperServer {
         return self.getId();
     }    
     
-    /**
-     * Learners don't make use of this method, only Leaders.
-     */
-    @Override
-    public void addCommittedProposal(Request request) {
-        // Don't do anything!
-    }
-    
     @Override
     protected void createSessionTracker() {
-        sessionTracker = new LearnerSessionTracker(this, sessionsWithTimeouts,
+        sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
                 self.getId());
     }
     
@@ -92,7 +85,7 @@ public abstract class LearnerZooKeeperServer extends ZooKeeperServer {
     protected void registerJMX() {
         // register with JMX
         try {
-            jmxDataTreeBean = new DataTreeBean(dataTree);
+            jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
             MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
         } catch (Exception e) {
             LOG.warn("Failed to register with JMX", e);

+ 3 - 2
src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java

@@ -25,6 +25,7 @@ import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
 /**
@@ -48,8 +49,8 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
         new ConcurrentLinkedQueue<Request>();
         
     ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
-            DataTreeBuilder treeBuilder) throws IOException {
-        super(logFactory, self.tickTime, treeBuilder);
+            DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+        super(logFactory, self.tickTime, treeBuilder, zkDb);
         this.self = self;        
     }
     

+ 36 - 22
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -33,7 +33,9 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.Util;
@@ -73,7 +75,15 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     QuorumBean jmxQuorumBean;
     LocalPeerBean jmxLocalPeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
-
+    
+    /* ZKDatabase is a top level member of quorumpeer 
+     * which will be used in all the zookeeperservers
+     * instantiated later. Also, it is created once on 
+     * bootup and only thrown away in case of a truncate
+     * message from the leader
+     */
+    private ZKDatabase zkDb;
+    
     /**
      * Create an instance of a quorum peer
      */
@@ -351,6 +361,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         this.initLimit = initLimit;
         this.syncLimit = syncLimit;        
         this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
+        this.zkDb = new ZKDatabase(this.logFactory);
         if(quorumConfig == null)
             this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
         else this.quorumConfig = quorumConfig;
@@ -362,6 +373,12 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     
     @Override
     public synchronized void start() {
+        try {
+            zkDb.loadDataBase();
+        } catch(IOException ie) {
+            LOG.fatal("Unable to load database on disk", ie);
+            throw new RuntimeException("Unable to run quorum server ", ie);
+        }
         cnxnFactory.start();        
         startLeaderElection();
         super.start();
@@ -447,27 +464,16 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
      * @return the highest zxid for this host
      */
     public long getLastLoggedZxid() {
-        /*
-         * it is possible to have the last zxid with just a snapshot and no log
-         * related to it. one example is during upgrade wherein the there is no
-         * corresponding log to the snapshot. in that case just use the snapshot
-         * zxid
-         */
-
-        File lastSnapshot = null;
-        long maxZxid = -1L;
-        long maxLogZxid = logFactory.getLastLoggedZxid();
+        long lastLogged= -1L;
         try {
-            lastSnapshot = logFactory.findMostRecentSnapshot();
-            if (lastSnapshot != null) {
-                maxZxid = Math.max(Util.getZxidFromName(lastSnapshot.getName(),
-                        "snapshot"), maxLogZxid);
+            if (!zkDb.isInitialized()) {
+                zkDb.loadDataBase();
             }
-        } catch (IOException ie) {
-            LOG.warn("Exception finding last snapshot ", ie);
-            maxZxid = maxLogZxid;
+            lastLogged = zkDb.getDataTreeLastProcessedZxid();
+        } catch(IOException ie) {
+            LOG.warn("Unable to load database ", ie);
         }
-        return maxZxid;
+        return lastLogged;
     }
     
     public Follower follower;
@@ -476,17 +482,17 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
 
     protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
         return new Follower(this, new FollowerZooKeeperServer(logFactory, 
-                this,new ZooKeeperServer.BasicDataTreeBuilder()));
+                this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
     }
      
     protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
         return new Leader(this, new LeaderZooKeeperServer(logFactory,
-                this,new ZooKeeperServer.BasicDataTreeBuilder()));
+                this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
     }
     
     protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
         return new Observer(this, new ObserverZooKeeperServer(logFactory,
-                this, new ZooKeeperServer.BasicDataTreeBuilder()));
+                this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
     }
 
     private Election createElectionAlgorithm(int electionAlgorithm){
@@ -878,4 +884,12 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     public FileTxnSnapLog getTxnFactory() {
         return this.logFactory;
     }
+
+    /**
+     * set zk database for this node
+     * @param database
+     */
+    public void setZKDatabase(ZKDatabase database) {
+        this.zkDb = database;
+    }
 }

+ 2 - 0
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java

@@ -25,6 +25,7 @@ import javax.management.JMException;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.ManagedUtil;
 import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
@@ -133,6 +134,7 @@ public class QuorumPeerMain {
           quorumPeer.setSyncLimit(config.getSyncLimit());
           quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
           quorumPeer.setCnxnFactory(cnxnFactory);
+          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
           quorumPeer.setPeerType(config.getPeerType());
   
           quorumPeer.start();

+ 4 - 4
src/java/test/org/apache/zookeeper/test/ACLTest.java

@@ -106,7 +106,7 @@ public class ACLTest extends TestCase implements Watcher {
             zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
         }
-        assertTrue("size of the acl map ", (1 == zks.dataTree.longKeyMap.size()));
+        assertTrue("size of the acl map ", (1 == zks.getZKDatabase().getAclSize()));
         for (int j = 100; j < 200; j++) {
             path = "/" + j;
             ACL acl = new ACL();
@@ -119,7 +119,7 @@ public class ACLTest extends TestCase implements Watcher {
             list.add(acl);
             zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
         }
-        assertTrue("size of the acl map ", (101 == zks.dataTree.longKeyMap.size()));
+        assertTrue("size of the acl map ", (101 == zks.getZKDatabase().getAclSize()));
         // now shutdown the server and restart it
         f.shutdown();
         assertTrue("waiting for server down",
@@ -138,7 +138,7 @@ public class ACLTest extends TestCase implements Watcher {
                 TimeUnit.MILLISECONDS);
         assertTrue("count == 0", startSignal.getCount() == 0);
 
-        assertTrue("acl map ", (101 == zks.dataTree.longKeyMap.size()));
+        assertTrue("acl map ", (101 == zks.getZKDatabase().getAclSize()));
         for (int j = 200; j < 205; j++) {
             path = "/" + j;
             ACL acl = new ACL();
@@ -151,7 +151,7 @@ public class ACLTest extends TestCase implements Watcher {
             list.add(acl);
             zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
         }
-        assertTrue("acl map ", (106 == zks.dataTree.longKeyMap.size()));
+        assertTrue("acl map ", (106 == zks.getZKDatabase().getAclSize()));
 
         zk.close();
 

+ 2 - 2
src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java

@@ -187,8 +187,8 @@ public class FLELostMessageTest extends TestCase {
             LOG.error("Null listener when initializing cnx manager");
         }
         
-        cnxManager.toSend(new Long(1), createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
+        cnxManager.toSend(new Long(1), createMsg(ServerState.LOOKING.ordinal(), 0, 0, 1));
         cnxManager.recvQueue.take();
-        cnxManager.toSend(new Long(1), createMsg(ServerState.FOLLOWING.ordinal(), 1, -1, 1));  
+        cnxManager.toSend(new Long(1), createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 1));  
     }
 }

+ 132 - 0
src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java

@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Before;
+
+
+public class ZkDatabaseCorruptionTest extends QuorumBase {
+    protected static final Logger LOG = Logger.getLogger(ZkDatabaseCorruptionTest.class);
+    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+    
+    private final QuorumBase qb = new QuorumBase();
+    
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        qb.setUp();
+    }
+        
+    protected void tearDown() throws Exception {
+    }
+    
+    private void corruptFile(File f) throws IOException {
+        RandomAccessFile outFile = new RandomAccessFile(f, "rw");
+        outFile.write("fail servers".getBytes());
+        outFile.close();
+    }
+    
+    private void corruptAllSnapshots(File snapDir) throws IOException {
+        File[] listFiles = snapDir.listFiles();
+        for (File f: listFiles) {
+            if (f.getName().startsWith("snapshot")) {
+                corruptFile(f);
+            }
+        }
+    }
+    
+    public void testCorruption() throws Exception {
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        SyncRequestProcessor.setSnapCount(100);
+        for (int i = 0; i < 2000; i++) {
+            zk.create("/0-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        zk.close();
+        QuorumPeer leader;
+        //find out who is the leader and kill it
+        if ( qb.s5.getPeerState() != ServerState.LEADING) {
+            throw new Exception("the last server is not the leader");
+        }
+        leader = qb.s5;
+        // now corrupt the qurompeer database
+        FileTxnSnapLog snapLog = leader.getTxnFactory();
+        File snapDir= snapLog.getSnapDir();
+        //corrupt all the snapshot in the snapshot directory
+        corruptAllSnapshots(snapDir);
+        qb.shutdownServers();
+        qb.setupServers();
+        qb.s1.start();
+        qb.s2.start();
+        qb.s3.start();
+        qb.s4.start();
+        try {
+            qb.s5.start();
+            assertTrue(false);
+        } catch(RuntimeException re) {
+            LOG.info("Got an error: expected", re);
+        }
+        //waut for servers to be up
+        String[] list = qb.hostPort.split(",");
+        for (int i =0; i < 4; i++) {
+            String hp = list[i];
+          assertTrue("waiting for server up",
+                       ClientBase.waitForServerUp(hp,
+                                    CONNECTION_TIMEOUT));
+            LOG.info(hp + " is accepting client connections");
+        }
+        
+        zk = qb.createClient();
+        SyncRequestProcessor.setSnapCount(100);
+        for (int i = 2000; i < 4000; i++) {
+            zk.create("/0-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        zk.close();
+        qb.s1.shutdown();
+        qb.s2.shutdown();
+        qb.s3.shutdown();
+        qb.s4.shutdown();
+    } 
+
+    
+}