Procházet zdrojové kódy

ZOOKEEPER-38. headers (version+) in log/snap files

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@700690 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed před 16 roky
rodič
revize
0254f868ca
35 změnil soubory, kde provedl 1887 přidání a 999 odebrání
  1. 3 0
      CHANGES.txt
  2. 0 82
      src/java/OldChangeLog
  3. 1 0
      src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java
  4. 12 11
      src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java
  5. 6 6
      src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java
  6. 9 11
      src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java
  7. 14 6
      src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
  8. 14 5
      src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
  9. 3 3
      src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java
  10. 3 3
      src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java
  11. 23 14
      src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
  12. 4 5
      src/java/main/org/apache/zookeeper/server/DataTree.java
  13. 1 2
      src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
  14. 31 22
      src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java
  15. 22 173
      src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
  16. 87 467
      src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
  17. 10 4
      src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
  18. 134 0
      src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java
  19. 480 0
      src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
  20. 236 0
      src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
  21. 59 0
      src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java
  22. 115 0
      src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
  23. 336 0
      src/java/main/org/apache/zookeeper/server/persistence/Util.java
  24. 13 7
      src/java/main/org/apache/zookeeper/server/quorum/Follower.java
  25. 4 5
      src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
  26. 5 6
      src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
  27. 5 5
      src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
  28. 74 149
      src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
  29. 5 6
      src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
  30. 105 0
      src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
  31. 56 0
      src/java/test/org/apache/zookeeper/server/DataTreeUnitTest.java
  32. 6 3
      src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java
  33. 2 2
      src/java/test/org/apache/zookeeper/test/ClientBase.java
  34. 1 2
      src/java/test/org/apache/zookeeper/test/RecoveryTest.java
  35. 8 0
      src/zookeeper.jute

+ 3 - 0
CHANGES.txt

@@ -96,3 +96,6 @@ Backward compatibile changes:
 
  ZOOKEEPER-117. threading issues in Leader election (Flavio Junqueira and Patrick
  Hunt via breed)
+
+ ZOOKEEPER-38. headers (version+) in log/snap files (Andrew Kornev and Mahadev
+ Konar via breed)

+ 0 - 82
src/java/OldChangeLog

@@ -1,82 +0,0 @@
-Release 2.2.0
-2008-05-08 Andrew Kornev <akornev@users.sourceforge.net>
-
-    * phunt: [1956480] Renamed ZooLog to ZooTrace. Major cleanup of tracing.
-    
-    * fpj: [1958274] got rid of unused vars in leader election
-    
-    * fpj: [1958361] Patch to fix NPE upon access to watcher
-
-    * tedunning: [1951806] Added a sample startup script.
-    
-    * akornev: [1956499] Added the "dist" target to the ant buildfile.
-    
-    * breed: [1947090] Fixed improper timeout tracking at clients.
-    
-    * phunt: [1953737] Millisecond timing in the trace file.
-    
-    * phunt: [1949253] Move to log4j for logging.
-    
-    * phunt: [1942451] build optimization: uptodate check on jute
-    
-    * phunt: [1943392] Test environment changes: unit/func/perf/coverage test
-    
-    * mahadevkonar: [1934859] Performance enhancement for serialization of records.
-    
-    * phunt: [1931630] Fixed ZooKeeperServer loadData() method to optimally 
-      scan for the most recent valid snapshot.
-    
-    * akornev: [1917295] Root node watch not triggered
-
-    * breed: [1912209] Session End Game handling
-      
-    * akornev: [1913967] code refactoring for JMX enablement. Added ServerStats and 
-      QuorumStats classes. Bug fixes: OutOfMemory under heavy load, disk I/O now 
-      uses buffered streams, NIOServerCnxn.Factory shuffles the selector keys 
-      to avoid starvation. Lots of formatting: replaced tabs with whitespaces, 
-      DOS eol style converted to UNIX.
-      
-    * breed: [1882928] Log the uncaught exceptions from the SendThread and EventThread
-    
-    * fpj: [1881204] New leader election algorithm over TCP.
-    
-    * akornev: [1898314] Added support for server version info at runtime; added 
-      the "release" target to ant build file
-    
-    * breed: [1892108] Configurable packet sanity check
-    
-    * akornev: [1889354] JAR manifest file now includes additional metadata: Built-By, 
-      Built-At, Built-On, Implementation-Title, Implementation-Version and 
-      Implementation-Vendor. Use SvnAnt ant task to extract SVN version number.
-      
-    * mahadevkonar: [1881545] fixed logging to output session id in hex
-
-Release 1.1.0
-2008-01-28 Andrew Kornev <akornev@users.sourceforge.net>
-
-    * breed: [1875540] Make sure java client aborts the outgoing packets when 
-      a connection closes.
-      
-    * Jute compiler: emit the #ifdef extern "C" guards in the generated .jute.h
-    
-    * mahadevkonar: [1844561] fast sync between the leader and the follower.
-    
-    * mahadevkonar: [1849444] fixed session id generation routine to generate 
-      unique session ids.
-      
-    * breed: [1845696] fixed a race condition in the quorum server where it 
-      is possible that a create session request can be committed and applied 
-      at a follower before it is applied at the leader.
-      
-    * breed: [1841938] implemented the sync operation to flush updates pedning 
-      on the Leader.
-      
-    * vlarsen: [1835834] fixed a few compiler warnings, removed some @Override 
-      annotations used with interfaces.
-    
-Release 1.0.0
-2007-11-27 Andrew Kornev <akornev@users.sourceforge.net>
-
-    * Updated the jute compiler to emit int32_t vs int in the generated C code
-    
-    * Changed release numbering scheme to match that of the C client

+ 1 - 0
src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java

@@ -55,6 +55,7 @@ public class DataTreeBean implements DataTreeMXBean, ZKMBeanInfo {
         }
         return stream.size();
       */
+        LOG.warn("Not Implemented");            
         return -1;
     }
 

+ 12 - 11
src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java

@@ -18,8 +18,11 @@
 
 package org.apache.zookeeper.server;
 
+import static org.apache.zookeeper.server.ServerConfig.getClientPort;
+
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
@@ -30,6 +33,9 @@ import org.apache.zookeeper.jmx.server.DataTreeBean;
 import org.apache.zookeeper.jmx.server.DataTreeMXBean;
 import org.apache.zookeeper.jmx.server.ZooKeeperServerBean;
 import org.apache.zookeeper.jmx.server.ZooKeeperServerMXBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+
 import org.apache.zookeeper.server.util.ConnectionObserver;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.ServerObserver;
@@ -103,22 +109,17 @@ public class ManagedZooKeeperServer extends ObservableZooKeeperServer {
         }
     }
 
-    public ManagedZooKeeperServer() {
-        super();
+    public ManagedZooKeeperServer(FileTxnSnapLog logFactory, 
+            int tickTime,DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory, tickTime,treeBuilder);
         ObserverManager.getInstance().add(new ManagedServerObserver());
         ObserverManager.getInstance().add(new ManagedConnectionObserver());
     }
 
-    public ManagedZooKeeperServer(File dataDir, File dataLogDir, int tickTime, DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, tickTime, treeBuilder);
+    public ManagedZooKeeperServer(FileTxnSnapLog logFactory,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory,treeBuilder);
         ObserverManager.getInstance().add(new ManagedServerObserver());
         ObserverManager.getInstance().add(new ManagedConnectionObserver());
     }
-
-    public ManagedZooKeeperServer(File dataDir, File dataLogDir, int tickTime) throws IOException {
-        super(dataDir, dataLogDir, tickTime);
-        ObserverManager.getInstance().add(new ManagedServerObserver());
-        ObserverManager.getInstance().add(new ManagedConnectionObserver());
-    }
-
 }

+ 6 - 6
src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java

@@ -26,6 +26,8 @@ import java.io.IOException;
 import org.apache.zookeeper.jmx.server.ConnectionMXBean;
 import org.apache.zookeeper.jmx.server.DataTreeMXBean;
 import org.apache.zookeeper.jmx.server.ZooKeeperServerMXBean;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.ZooKeeperObserverManager;
 
 /**
@@ -65,12 +67,10 @@ public class ManagedZooKeeperServerMain extends ZooKeeperServerMain {
                 return new ObservableNIOServerCnxn.Factory(getClientPort());
             }
             public ZooKeeperServer createServer() throws IOException {
-                ManagedZooKeeperServer zks = new ManagedZooKeeperServer();
-                zks.setDataDir(new File(ServerConfig.getDataDir()));
-                zks.setDataLogDir(new File(ServerConfig.getDataLogDir()));
-                zks.setClientPort(ServerConfig.getClientPort());
-                // TODO: we may want to build an observable/managed data tree here instead
-                zks.setTreeBuilder(new ZooKeeperServer.BasicDataTreeBuilder());
+                ManagedZooKeeperServer zks = new ManagedZooKeeperServer(
+                        new FileTxnSnapLog(new File(ServerConfig.getDataDir()),
+                        new File(ServerConfig.getDataLogDir())),
+                new ZooKeeperServer.BasicDataTreeBuilder());
                 return zks;
             }
         });

+ 9 - 11
src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java

@@ -18,11 +18,13 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.File;
 import java.io.IOException;
 
+
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.ServerObserver;
+
 /**
  * The observable server broadcast notifications when its state changes. 
  * 
@@ -33,17 +35,13 @@ import org.apache.zookeeper.server.util.ServerObserver;
 public class ObservableZooKeeperServer extends ZooKeeperServer{
 
     private ZooKeeperObserverNotifier notifier=new ZooKeeperObserverNotifier(this);
-    
-    public ObservableZooKeeperServer() {
-        super();
+    public ObservableZooKeeperServer(FileTxnSnapLog logFactory, 
+            int tickTime,DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory, tickTime,treeBuilder);
     }
-
-    public ObservableZooKeeperServer(File dataDir, File dataLogDir, int tickTime, DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, tickTime, treeBuilder);
-    }
-
-    public ObservableZooKeeperServer(File dataDir, File dataLogDir, int tickTime) throws IOException {
-        super(dataDir, dataLogDir, tickTime);
+    public ObservableZooKeeperServer(FileTxnSnapLog logFactory,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory,treeBuilder);
     }
 
     public void shutdown() {

+ 14 - 6
src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java

@@ -46,6 +46,8 @@ import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.util.ConnectionObserver;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.QuorumPeerObserver;
@@ -207,15 +209,21 @@ public class ManagedQuorumPeer extends ObservableQuorumPeer {
         setupObservers();
     }
 
-    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
-                                int syncLimit) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, 
+            File dataDir, File dataLogDir, int clientPort, 
+            int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
+            int syncLimit) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, clientPort, 
+                electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
         setupObservers();
     }
 
-    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, int electionPort, long myid, int tickTime, int initLimit, int syncLimit,
-                                NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort, myid, tickTime, initLimit, syncLimit, cnxnFactory);
+    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, 
+            File dataDir, File dataLogDir, int electionType, int electionPort,
+            long myid, int tickTime, int initLimit, int syncLimit,
+            NIOServerCnxn.Factory cnxnFactory) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
+                myid, tickTime, initLimit, syncLimit, cnxnFactory);
         setupObservers();
     }
 

+ 14 - 5
src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java

@@ -35,6 +35,14 @@ import org.apache.zookeeper.server.ManagedZooKeeperServerMain;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ObservableNIOServerCnxn;
 import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.util.ConnectionObserver;
+import org.apache.zookeeper.server.util.ObserverManager;
+import org.apache.zookeeper.server.util.QuorumPeerObserver;
+import org.apache.zookeeper.server.util.ServerObserver;
 import org.apache.zookeeper.server.util.ZooKeeperObserverManager;
 
 /**
@@ -82,12 +90,13 @@ public class ManagedQuorumPeerMain {
             ZooKeeperObserverManager.setAsConcrete();
             runPeer(new QuorumPeer.Factory() {
                 public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory)
-                        throws IOException {
-                    
+                throws IOException {
                     ManagedQuorumPeer peer = new ManagedQuorumPeer();
                     peer.setClientPort(ServerConfig.getClientPort());
-                    peer.setDataDir(new File(ServerConfig.getDataDir()));
-                    peer.setDataLogDir(new File(ServerConfig.getDataLogDir()));
+                    FileTxnSnapLog factory = new FileTxnSnapLog(new 
+                            File(ServerConfig.getDataLogDir()), new  
+                                    File(ServerConfig.getDataDir()));
+                    peer.setTxnFactory(factory);
                     peer.setQuorumPeers(QuorumPeerConfig.getServers());
                     peer.setElectionPort(QuorumPeerConfig.getElectionPort());
                     peer.setElectionType(QuorumPeerConfig.getElectionAlg());
@@ -97,7 +106,7 @@ public class ManagedQuorumPeerMain {
                     peer.setSyncLimit(QuorumPeerConfig.getSyncLimit());
                     peer.setCnxnFactory(cnxnFactory);
                     return peer;
-                    
+
                 }
                 public NIOServerCnxn.Factory createConnectionFactory() throws IOException {
                     return new ObservableNIOServerCnxn.Factory(getClientPort());

+ 3 - 3
src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java

@@ -18,10 +18,10 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.zookeeper.server.ZooKeeperObserverNotifier;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.ServerObserver;
 
@@ -37,9 +37,9 @@ public class ObservableFollowerZooKeeperServer extends FollowerZooKeeperServer {
 
     private ZooKeeperObserverNotifier notifier;
 
-    public ObservableFollowerZooKeeperServer(File dataDir, File dataLogDir,
+    public ObservableFollowerZooKeeperServer(FileTxnSnapLog logFactory,
             QuorumPeer self, DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, self, treeBuilder);
+        super(logFactory, self, treeBuilder);
         notifier=new ZooKeeperObserverNotifier(this);
     }
     

+ 3 - 3
src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java

@@ -18,10 +18,10 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.zookeeper.server.ZooKeeperObserverNotifier;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.ServerObserver;
 
@@ -37,9 +37,9 @@ public class ObservableLeaderZooKeeperServer extends LeaderZooKeeperServer {
 
     private ZooKeeperObserverNotifier notifier;
 
-    public ObservableLeaderZooKeeperServer(File dataDir, File dataLogDir,
+    public ObservableLeaderZooKeeperServer(FileTxnSnapLog logFactory,
             QuorumPeer self, DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, self, treeBuilder);
+        super(logFactory, self, treeBuilder);
         notifier=new ZooKeeperObserverNotifier(this);
     }
 

+ 23 - 14
src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java

@@ -24,6 +24,7 @@ import java.util.ArrayList;
 
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.EventInfo;
 import org.apache.zookeeper.server.util.ObservableComponent;
 import org.apache.zookeeper.server.util.ObserverManager;
@@ -58,32 +59,40 @@ public class ObservableQuorumPeer extends QuorumPeer implements ObservableCompon
         };
         public abstract void dispatch(ObservableQuorumPeer peer,QuorumPeerObserver ob);
     }
-
-
     public ObservableQuorumPeer() {
         super();
     }
 
-    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
-                                int syncLimit) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
+            File dataLogDir, int clientPort, int electionAlg,
+            int electionPort, long myid, int tickTime, int initLimit,
+            int syncLimit) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, clientPort, 
+                electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
     }
 
-    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, int electionPort, long myid, int tickTime, int initLimit, int syncLimit,
-                                NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort, myid, tickTime, initLimit, syncLimit, cnxnFactory);
+    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers,
+            File dataDir, File dataLogDir, int electionType, 
+            int electionPort, long myid, int tickTime, 
+            int initLimit, int syncLimit,
+            NIOServerCnxn.Factory cnxnFactory) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
+                myid, tickTime, initLimit, syncLimit, cnxnFactory);
     }
 
+
     // instantiate an observable follower
-    protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException {
-        return new ObservableFollower(this, new ObservableFollowerZooKeeperServer(dataDir,
-                dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder()));
+    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+        return new ObservableFollower(this, 
+                new ObservableFollowerZooKeeperServer(logFactory, this,
+                        new ZooKeeperServer.BasicDataTreeBuilder()));
     }
 
     // instantiate an observable leader
-    protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException {
-        return new ObservableLeader(this, new ObservableLeaderZooKeeperServer(dataDir, 
-                dataLogDir,this,new ZooKeeperServer.BasicDataTreeBuilder()));
+    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+        return new ObservableLeader(this, 
+                new ObservableLeaderZooKeeperServer(logFactory, 
+                        this,new ZooKeeperServer.BasicDataTreeBuilder()));
     }
 
     public void run() {

+ 4 - 5
src/java/main/org/apache/zookeeper/server/DataTree.java

@@ -285,7 +285,8 @@ public class DataTree {
         }
     }
 
-    public ArrayList<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
+    public List<String> getChildren(String path, Stat stat, Watcher watcher) 
+            throws KeeperException.NoNodeException {
         DataNode n = nodes.get(path);
         if (n == null) {
             throw new KeeperException.NoNodeException();
@@ -462,8 +463,7 @@ public class DataTree {
      * @throws IOException
      * @throws InterruptedException
      */
-    void serializeNode(OutputArchive oa, StringBuilder path)
-            throws IOException, InterruptedException {
+    void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
         String pathString = path.toString();
         DataNode node = getNode(pathString);
         if (node == null) {
@@ -494,8 +494,7 @@ public class DataTree {
 
     public boolean initialized = false;
 
-    public void serialize(OutputArchive oa, String tag) throws IOException,
-            InterruptedException {
+    public void serialize(OutputArchive oa, String tag) throws IOException {
         scount = 0;
         serializeNode(oa, new StringBuilder(""));
         // / marks end of stream

+ 1 - 2
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -20,7 +20,6 @@ package org.apache.zookeeper.server;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.log4j.Logger;
@@ -212,7 +211,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 }
                 PrepRequestProcessor.checkACL(zks, n.acl, ZooDefs.Perms.READ,
                         request.authInfo);
-                ArrayList<String> children = zks.dataTree.getChildren(
+                List<String> children = zks.dataTree.getChildren(
                         getChildrenRequest.getPath(), stat, getChildrenRequest
                                 .getWatch() ? request.cnxn : null);
                 rsp = new GetChildrenResponse(children);

+ 31 - 22
src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java

@@ -22,16 +22,21 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.text.DateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+
 public class PurgeTxnLog {
 
     static void printUsage(){
-        System.out.println("PurgeTxnLog dataLogDir ");
+        System.out.println("PurgeTxnLog dataLogDir [snapDir]");
         System.out.println("\tdataLogDir -- path to the txn log directory");
+        System.out.println("\tsnapDir -- path to the snapshot directory");
         System.exit(1);
     }
     /**
@@ -39,38 +44,42 @@ public class PurgeTxnLog {
      *     dataLogDir -- txn log directory
      */
     public static void main(String[] args) throws IOException {
-        if(args.length!=1)
+        if(args.length<1 || args.length>2)
             printUsage();
 
         File dataDir=new File(args[0]);
-
-        // find the most recent valid snapshot
-        long highestZxid = -1;
-        for (File f : dataDir.listFiles()) {
-            long zxid = ZooKeeperServer.isValidSnapshot(f);
-            if (zxid > highestZxid) {
-                highestZxid = zxid;
+        File snapDir=dataDir;
+        if(args.length==2){
+            snapDir=new File(args[1]);
             }
-        }
-        // found any valid snapshots?
-        if(highestZxid==-1)
-            return;  // no snapshots
-
+        FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
+        
+        // found any valid recent snapshots?
+        
         // files to exclude from deletion
         Set<File> exc=new HashSet<File>();
-        exc.add(new File(dataDir, "snapshot."+Long.toHexString(highestZxid)));
-        exc.addAll(Arrays.asList(ZooKeeperServer.getLogFiles(dataDir.listFiles(),highestZxid)));
+        File snapShot = txnLog.findMostRecentSnapshot();
+        exc.add(txnLog.findMostRecentSnapshot());
+        long zxid = Util.getZxidFromName(snapShot.getName(),"snapshot");
+        exc.addAll(Arrays.asList(txnLog.getSnapshotLogs(zxid)));
 
         final Set<File> exclude=exc;
-        List<File> files=Arrays.asList(dataDir.listFiles(new FileFilter(){
+        class MyFileFilter implements FileFilter{
+            private final String prefix;
+            MyFileFilter(String prefix){
+                this.prefix=prefix;
+            }
             public boolean accept(File f){
-                if(!f.getName().startsWith("log.") &&
-                        !f.getName().startsWith("snapshot."))
-                    return false;
-                if(exclude.contains(f))
+                if(!f.getName().startsWith(prefix) || exclude.contains(f))
                     return false;
                 return true;
-            }}));
+            }
+        }
+        // add all non-excluded log files
+        List<File> files=new ArrayList<File>(
+                Arrays.asList(dataDir.listFiles(new MyFileFilter("log."))));
+        // add all non-excluded snapshot files to the deletion list
+        files.addAll(Arrays.asList(snapDir.listFiles(new MyFileFilter("snapshot."))));
         // remove the old files
         for(File f: files)
         {

+ 22 - 173
src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java

@@ -18,22 +18,13 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.LinkedList;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.log4j.Logger;
 
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.server.util.Profiler;
-import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This RequestProcessor logs requests to disk. It batches the requests to do
@@ -42,57 +33,24 @@ import org.apache.zookeeper.txn.TxnHeader;
  */
 public class SyncRequestProcessor extends Thread implements RequestProcessor {
     private static final Logger LOG = Logger.getLogger(SyncRequestProcessor.class);
-
-    static final int PADDING_TIMEOUT=1000;
-    ZooKeeperServer zks;
-
-    LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
-
-    static boolean forceSync;
-    static {
-        forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals(
-                "no");
-    }
-
-    private static long preAllocSize = 65536 * 1024;
-    static {
-        String size = System.getProperty("zookeeper.preAllocSize");
-        if (size != null) {
-            try {
-                preAllocSize = Long.parseLong(size) * 1024;
-            } catch (NumberFormatException e) {
-                LOG.warn(size 
-                        + " is not a valid value for zookeeper.preAllocSize");
-            }
-        }
-    }
-    
+    private ZooKeeperServer zks;
+    private LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+    private RequestProcessor nextProcessor;
+    boolean timeToDie = false;
     /**
-     * Change the data log pre-allocation size on the fly.
-     * 
-     * You might want to do this on systems (Windows esp) where preallocation
-     * is slow, WARN messages are output the log if preAllocation is taking
-     * too long -- will stall the request pipeline.
-     * 
-     * This value can also be set through the "zookeeper.preAllocSize" (also
-     * in K bytes) environment variable when starting the jvm.
-     * 
-     * @param size size in K bytes to change the log prealloc to
+     * Transactions that have been written and are waiting to be flushed to
+     * disk. Basically this is the list of SyncItems whose callbacks will be
+     * invoked after flush returns successfully.
      */
-    public static void setPreAllocSize(long size) {
-        preAllocSize = size * 1024; 
-    }
-
+    private LinkedList<Request> toFlush = new LinkedList<Request>();
+    private Random r = new Random(System.nanoTime());
+    private int logCount = 0;
     /**
      * The number of log entries to log before starting a snapshot
      */
-    static public int snapCount = ZooKeeperServer.getSnapCount();
-
-    Thread snapInProcess;
-
-    RequestProcessor nextProcessor;
+    public static int snapCount = ZooKeeperServer.getSnapCount();
 
-    boolean timeToDie = false;
+    private Request requestOfDeath = Request.requestOfDeath;
 
     public SyncRequestProcessor(ZooKeeperServer zks,
             RequestProcessor nextProcessor) {
@@ -102,45 +60,13 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
         start();
     }
 
-    /**
-     * Transactions that have been written and are waiting to be flushed to
-     * disk. Basically this is the list of SyncItems whose callbacks will be
-     * invoked after flush returns successfully.
-     */
-    LinkedList<Request> toFlush = new LinkedList<Request>();
-
-    FileOutputStream logStream;
-
-    BinaryOutputArchive logArchive;
-
-    Random r = new Random(System.nanoTime());
-
-    int logCount = 0;
-
-    Request requestOfDeath = Request.requestOfDeath;
-
-    private static ByteBuffer fill = ByteBuffer.allocateDirect(1024);
-
-    LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();
-
-    private long padLogFile(FileChannel fc,long fileSize) throws IOException{
-        long position = fc.position();
-        // We pad the file in 1M chunks to avoid syncing to
-        // write the new filesize.
-        if (position + 4096 >= fileSize) {
-            fileSize = fileSize + preAllocSize;
-            fill.position(0);
-            fc.write(fill, fileSize);
-        }
-        return fileSize;
+    private void startSnapshot() throws IOException {
+        zks.takeSnapshot();
     }
 
     @Override
     public void run() {
         try {
-            long fileSize = 0;
-            long lastZxidSeen = -1;
-            FileChannel fc = null;
             while (true) {
                 Request si = null;
                 if (toFlush.isEmpty()) {
@@ -156,75 +82,16 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
                     break;
                 }
                 if (si != null) {
-                    // LOG.warn("Sync>>> cxid = " + si.cxid + " type = " +
-                    // si.type + " id = " + si.sessionId + " zxid = " +
-                    // Long.toHexString(si.zxid));
-                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
-                            'S', si, "");
-                    TxnHeader hdr = si.hdr;
-                    if (hdr != null) {
-                        if (hdr.getZxid() <= lastZxidSeen) {
-                            LOG.warn("Current zxid " + hdr.getZxid()
-                                    + " is <= " + lastZxidSeen + " for "
-                                    + hdr.getType());
-                        }
-                        Record txn = si.txn;
-                        if (logStream == null) {
-                            fileSize = 0;
-                            logStream = new FileOutputStream(new File(
-                                    zks.dataLogDir, ZooKeeperServer
-                                            .getLogName(hdr.getZxid())));
-                            synchronized (streamsToFlush) {
-                                streamsToFlush.add(logStream);
-                            }
-                            fc = logStream.getChannel();
-                            logArchive = BinaryOutputArchive
-                                    .getArchive(logStream);
-                        }
-                        final long fsize=fileSize;
-                        final FileChannel ffc=fc;
-                        fileSize = Profiler.profile(
-                            new Profiler.Operation<Long>() {
-                                public Long execute() throws Exception {
-                                    return SyncRequestProcessor.this
-                                            .padLogFile(ffc, fsize);
-                                }
-                            }, PADDING_TIMEOUT,
-                            "Logfile padding exceeded time threshold"
-                        );
-                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                        BinaryOutputArchive boa = BinaryOutputArchive
-                                .getArchive(baos);
-                        hdr.serialize(boa, "hdr");
-                        if (txn != null) {
-                            txn.serialize(boa, "txn");
-                        }
-                        logArchive.writeBuffer(baos.toByteArray(), "txnEntry");
-                        logArchive.writeByte((byte) 0x42, "EOR");
+                    zks.getLogWriter().append(si);
                         logCount++;
                         if (logCount > snapCount / 2
                                 && r.nextInt(snapCount / 2) == 0) {
-                            // We just want one snapshot going at a time
-                            if (snapInProcess != null
-                                    && snapInProcess.isAlive()) {
-                                LOG.warn("Too busy to snap, skipping");
-                            } else {
-                                logStream = null;
-                                logArchive = null;
-                                snapInProcess = new Thread() {
-                                    public void run() {
-                                        try {
-                                            zks.snapshot();
-                                        } catch (Exception e) {
-                                            LOG.warn("Unexpected exception",e);
-                                        }
-                                    }
-                                };
-                                snapInProcess.start();
-                            }
+                            // roll the log
+                            zks.getLogWriter().rollLog();
+                            // take a snapshot
+                            startSnapshot();
                             logCount = 0;
                         }
-                    }
                     toFlush.add(si);
                     if (toFlush.size() > 1000) {
                         flush(toFlush);
@@ -236,33 +103,15 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
             System.exit(11);
         }
         ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-                                     "SyncRequestProcessor exiyed!");
+                                     "SyncRequestProcessor exited!");
     }
 
     @SuppressWarnings("unchecked")
     private void flush(LinkedList<Request> toFlush) throws IOException {
-        if (toFlush.size() == 0) {
+        if (toFlush.size() == 0)
             return;
-        }
 
-        LinkedList<FileOutputStream> streamsToFlushNow;
-        synchronized (streamsToFlush) {
-            streamsToFlushNow = (LinkedList<FileOutputStream>) streamsToFlush
-                    .clone();
-        }
-        for (FileOutputStream fos : streamsToFlushNow) {
-            fos.flush();
-            if (forceSync) {
-                fos.getChannel().force(false);
-            }
-        }
-        while (streamsToFlushNow.size() > 1) {
-            FileOutputStream fos = streamsToFlushNow.removeFirst();
-            fos.close();
-            synchronized (streamsToFlush) {
-                streamsToFlush.remove(fos);
-            }
-        }
+        zks.getLogWriter().commit();
         while (toFlush.size() > 0) {
             Request i = toFlush.remove();
             nextProcessor.processRequest(i);

+ 87 - 467
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -18,53 +18,36 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.log4j.Logger;
-
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.txn.CreateSessionTxn;
-import org.apache.zookeeper.txn.CreateTxn;
-import org.apache.zookeeper.txn.DeleteTxn;
-import org.apache.zookeeper.txn.ErrorTxn;
-import org.apache.zookeeper.txn.SetACLTxn;
-import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -108,14 +91,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     private DataTreeBuilder treeBuilder;
     public DataTree dataTree;
     protected SessionTracker sessionTracker;
-    /**
-     * directory for storing the snapshot
-     */
-    File dataDir;
-    /**
-     * directoy for storing the log tnxns
-     */
-    File dataLogDir;
+    private FileTxnSnapLog txnLogFactory = null;
     protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
     protected long hzxid = 0;
     final public static Exception ok = new Exception("No prob");
@@ -131,12 +107,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
     private NIOServerCnxn.Factory serverCnxnFactory;
     private int clientPort;
-
- 
+    
     void removeCnxn(ServerCnxn cnxn) {
         dataTree.removeCnxn(cnxn);
     }
 
+ 
     /**
      * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
      * methods to prepare the instance (eg datadir, datalogdir, ticktime, 
@@ -148,6 +124,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         ServerStats.getInstance().setStatsProvider(this);
         treeBuilder = new BasicDataTreeBuilder();
     }
+
+    
     /**
      * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
      * actually start listening for clients until run() is invoked.
@@ -155,27 +133,25 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * @param dataDir the directory to put the data
      * @throws IOException
      */
-    public ZooKeeperServer(File dataDir, File dataLogDir, int tickTime,
+    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
             DataTreeBuilder treeBuilder) throws IOException {
-        this.dataDir = dataDir;
-        this.dataLogDir = dataLogDir;
+        this.txnLogFactory=txnLogFactory;
         this.tickTime = tickTime;
         this.treeBuilder = treeBuilder;
         ServerStats.getInstance().setStatsProvider(this);
         
-        LOG.info("Created server with dataDir:" + dataDir 
-                + " dataLogDir:" + dataLogDir
-                + " tickTime:" + tickTime);
+        LOG.info("Created server");
     }
 
     /**
      * This constructor is for backward compatibility with the existing unit
      * test code.
+     * It defaults to FileLogProvider persistence provider.
      */
-    public ZooKeeperServer(File dataDir, File dataLogDir, int tickTime)
-        throws IOException 
-    {
-        this(dataDir, dataLogDir, tickTime, new BasicDataTreeBuilder());
+    public ZooKeeperServer(File snapDir, File logDir, int tickTime)
+            throws IOException {
+        this(new FileTxnSnapLog(snapDir,logDir),
+                tickTime,new BasicDataTreeBuilder());
     }
 
     /**
@@ -183,180 +159,28 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      *
      * @throws IOException
      */
-    public ZooKeeperServer(DataTreeBuilder treeBuilder) throws IOException {
-        this(new File(ServerConfig.getDataDir()), new File(ServerConfig
-                .getDataLogDir()), DEFAULT_TICK_TIME, treeBuilder);
-    }
-
-    public static long getZxidFromName(String name, String prefix) {
-        long zxid = -1;
-        String nameParts[] = name.split("\\.");
-        if (nameParts.length == 2 && nameParts[0].equals(prefix)) {
-            try {
-                zxid = Long.parseLong(nameParts[1], 16);
-            } catch (NumberFormatException e) {
-                LOG.warn("unable to parse zxid string into long: "
-                        + nameParts[1]);
-            }
-        }
-        return zxid;
-    }
-
-    static public long isValidSnapshot(File f) throws IOException {
-        long zxid = getZxidFromName(f.getName(), "snapshot");
-        if (zxid == -1)
-            return -1;
-
-        // Check for a valid snapshot
-        RandomAccessFile raf = new RandomAccessFile(f, "r");
-        try {
-            raf.seek(raf.length() - 5);
-            byte bytes[] = new byte[5];
-            raf.read(bytes);
-            ByteBuffer bb = ByteBuffer.wrap(bytes);
-            int len = bb.getInt();
-            byte b = bb.get();
-            if (len != 1 || b != '/') {
-                LOG.warn("Invalid snapshot " + f + " len = " + len
-                        + " byte = " + (b & 0xff));
-                return -1;
-            }
-        } finally {
-            raf.close();
-        }
-
-        return zxid;
-    }
-
-    /**
-     * Compare file file names of form "prefix.version". Sort order result
-     * returned in order of version.
-     */
-    private static class DataDirFileComparator implements Comparator<File> {
-        private String prefix;
-        private boolean ascending;
-        public DataDirFileComparator(String prefix, boolean ascending) {
-            this.prefix = prefix;
-            this.ascending = ascending;
-        }
-
-        public int compare(File o1, File o2) {
-            long z1 = getZxidFromName(o1.getName(), prefix);
-            long z2 = getZxidFromName(o2.getName(), prefix);
-            int result = z1 < z2 ? -1 : (z1 > z2 ? 1 : 0);
-            return ascending ? result : -result;
-        }
-    }
-
-    /**
-     * Sort the list of files. Recency as determined by the version component
-     * of the file name.
-     *
-     * @param files array of files
-     * @param prefix files not matching this prefix are assumed to have a
-     * version = -1)
-     * @param ascending true sorted in ascending order, false results in
-     * descending order
-     * @return sorted input files
-     */
-    static List<File>
-        sortDataDir(File[] files, String prefix, boolean ascending)
-    {
-        List<File> filelist = Arrays.asList(files);
-        Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
-        return filelist;
-    }
-
-    /**
-     * Find the log file that starts at, or just before, the snapshot. Return
-     * this and all subsequent logs. Results are ordered by zxid of file,
-     * ascending order.
-     *
-     * @param logDirList array of files
-     * @param snapshotZxid return files at, or before this zxid
-     * @return
-     */
-    static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
-        List<File> files = sortDataDir(logDirList, "log", true);
-        long logZxid = 0;
-        // Find the log file that starts before or at the same time as the
-        // zxid of the snapshot
-        for (File f : files) {
-            long fzxid = getZxidFromName(f.getName(), "log");
-            if (fzxid > snapshotZxid) {
-                continue;
-            }
-            if (fzxid > logZxid) {
-                logZxid = fzxid;
-            }
-        }
-        List<File> v=new ArrayList<File>(5);
-        // Apply the logs
-        for (File f : files) {
-            long fzxid = getZxidFromName(f.getName(), "log");
-            if (fzxid < logZxid) {
-                continue;
-            }
-            v.add(f);
-        }
-        return v.toArray(new File[0]);
+    public ZooKeeperServer(FileTxnSnapLog txnLogFactory,DataTreeBuilder treeBuilder) throws IOException {
+        this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder);
     }
 
     /**
      *  Restore sessions and data
      */
-    private void loadSnapshotAndLogs() throws IOException {
-        long zxid = -1;
-
-        // Find the most recent snapshot
-        List<File> files = sortDataDir(dataDir.listFiles(), "snapshot", false);
-        for (File f : files) {
-            zxid = isValidSnapshot(f);
-            if (zxid == -1) {
-                LOG.warn("Skipping " + f);
-                continue;
-            }
-
-            LOG.warn("Processing snapshot: " + f);
-
-            FileInputStream snapFIS = new FileInputStream(f);
-            try {
-                InputStream snapIS = new BufferedInputStream(snapFIS);
-                try {
-                    loadData(BinaryInputArchive.getArchive(snapIS));
-                } finally {
-                    snapIS.close();
-                }
-            } finally {
-                snapFIS.close();
-            }
-
-            dataTree.lastProcessedZxid = zxid;
-
-            // Apply the logs on/after the selected snapshot
-            File[] logfiles = getLogFiles(dataLogDir.listFiles(), zxid);
-            for (File logfile : logfiles) {
-                LOG.warn("Processing log file: " + logfile);
-
-                InputStream logIS =
-                    new BufferedInputStream(new FileInputStream(logfile));
-                zxid = playLog(BinaryInputArchive.getArchive(logIS));
-                logIS.close();
-            }
-            hzxid = zxid;
-
-            break;
-        }
-
-        if (zxid == -1) {
-            sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
-            dataTree = treeBuilder.build();
-        }
-    }
-
     public void loadData() throws IOException, InterruptedException {
-        loadSnapshotAndLogs();
-
+        PlayBackListener listener=new PlayBackListener(){
+            public void onTxnLoaded(TxnHeader hdr,Record txn){
+                Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
+                        null, null);
+                r.txn = txn;
+                r.hdr = hdr;
+                r.zxid = hdr.getZxid();
+                addCommittedProposal(r);
+            }
+        };
+        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
+        dataTree = treeBuilder.build();
+        long zxid = txnLogFactory.restore(dataTree,sessionsWithTimeouts,listener);
+        this.hzxid = zxid;
         // Clean up dead sessions
         LinkedList<Long> deadSessions = new LinkedList<Long>();
         for (long session : dataTree.getSessions()) {
@@ -369,94 +193,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             killSession(session);
         }
         // Make a clean snapshot
-        snapshot();
-    }
-
-    public void loadData(InputArchive ia) throws IOException {
-        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
-        dataTree = treeBuilder.build();
-
-        int count = ia.readInt("count");
-        while (count > 0) {
-            long id = ia.readLong("id");
-            int to = ia.readInt("timeout");
-            sessionsWithTimeouts.put(id, to);
-            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
-                                     "loadData --- session in archive: " + id
-                                     + " with timeout: " + to);
-            count--;
-        }
-        dataTree.deserialize(ia, "tree");
-    }
-
-    public long playLog(InputArchive logStream) throws IOException {
-        long highestZxid = 0;
-        try {
-            while (true) {
-                byte[] bytes = logStream.readBuffer("txnEntry");
-                if (bytes.length == 0) {
-                    // Since we preallocate, we define EOF to be an
-                    // empty transaction
-                    throw new EOFException();
-                }
-                InputArchive ia = BinaryInputArchive
-                        .getArchive(new ByteArrayInputStream(bytes));
-                TxnHeader hdr = new TxnHeader();
-                Record txn = deserializeTxn(ia, hdr);
-                if (logStream.readByte("EOR") != 'B') {
-                    LOG.warn("Last transaction was partial.");
-                    throw new EOFException();
-                }
-                if (hdr.getZxid() <= highestZxid && highestZxid != 0) {
-                    LOG.error(highestZxid + "(higestZxid) >= "
-                            + hdr.getZxid() + "(next log) for type "
-                            + hdr.getType());
-                } else {
-                    highestZxid = hdr.getZxid();
-                }
-                switch (hdr.getType()) {
-                case OpCode.createSession:
-                    sessionsWithTimeouts.put(hdr.getClientId(),
-                            ((CreateSessionTxn) txn).getTimeOut());
-                    ZooTrace.logTraceMessage(LOG,
-                                             ZooTrace.SESSION_TRACE_MASK,
-                            "playLog --- create session in log: 0x"
-                                    + Long.toHexString(hdr.getClientId())
-                                    + " with timeout: "
-                                    + ((CreateSessionTxn) txn).getTimeOut());
-                    // give dataTree a chance to sync its lastProcessedZxid
-                    dataTree.processTxn(hdr, txn);
-                    break;
-                case OpCode.closeSession:
-                    sessionsWithTimeouts.remove(hdr.getClientId());
-                    ZooTrace.logTraceMessage(LOG,
-                            ZooTrace.SESSION_TRACE_MASK,
-                            "playLog --- close session in log: 0x"
-                                    + Long.toHexString(hdr.getClientId()));
-                    dataTree.processTxn(hdr, txn);
-                    break;
-                default:
-                    dataTree.processTxn(hdr, txn);
-                }
-                Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(),
-                        null, null);
-                r.txn = txn;
-                r.hdr = hdr;
-                r.zxid = hdr.getZxid();
-                addCommittedProposal(r);
-            }
-        } catch (EOFException e) {
-            // expected in some cases - see comments in try block
-        }
-        return highestZxid;
+        takeSnapshot();
     }
 
     /**
      * maintains a list of last 500 or so committed requests. This is used for
      * fast follower synchronization.
      *
-     * @param request
-     *            committed request
+     * @param request committed request
      */
 
     public void addCommittedProposal(Request request) {
@@ -492,135 +236,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
-    static public Record deserializeTxn(InputArchive ia, TxnHeader hdr)
-            throws IOException {
-        hdr.deserialize(ia, "hdr");
-        Record txn = null;
-        switch (hdr.getType()) {
-        case OpCode.createSession:
-            // This isn't really an error txn; it just has the same
-            // format. The error represents the timeout
-            txn = new CreateSessionTxn();
-            break;
-        case OpCode.closeSession:
-            return null;
-        case OpCode.create:
-            txn = new CreateTxn();
-            break;
-        case OpCode.delete:
-            txn = new DeleteTxn();
-            break;
-        case OpCode.setData:
-            txn = new SetDataTxn();
-            break;
-        case OpCode.setACL:
-            txn = new SetACLTxn();
-            break;
-        case OpCode.error:
-            txn = new ErrorTxn();
-            break;
-        }
-        if (txn != null) {
-            txn.deserialize(ia, "txn");
-        }
-        return txn;
-    }
-
-    public void truncateLog(long finalZxid) throws IOException {
-        long highestZxid = 0;
-        for (File f : dataDir.listFiles()) {
-            long zxid = isValidSnapshot(f);
-            if (zxid == -1) {
-                LOG.warn("Skipping " + f);
-                continue;
-            }
-            if (zxid > highestZxid) {
-                highestZxid = zxid;
-            }
-        }
-        File[] files = getLogFiles(dataLogDir.listFiles(), highestZxid);
-        boolean truncated = false;
-        for (File f : files) {
-            FileInputStream fin = new FileInputStream(f);
-            InputArchive ia = BinaryInputArchive.getArchive(fin);
-            FileChannel fchan = fin.getChannel();
-            try {
-                while (true) {
-                    byte[] bytes = ia.readBuffer("txtEntry");
-                    if (bytes.length == 0) {
-                        // Since we preallocate, we define EOF to be an
-                        // empty transaction
-                        throw new EOFException();
-                    }
-                    InputArchive iab = BinaryInputArchive
-                            .getArchive(new ByteArrayInputStream(bytes));
-                    TxnHeader hdr = new TxnHeader();
-                    deserializeTxn(iab, hdr);
-                    if (ia.readByte("EOF") != 'B') {
-                        LOG.warn("Last transaction was partial.");
-                        throw new EOFException();
-                    }
-                    if (hdr.getZxid() == finalZxid) {
-                        // this is where we need to truncate
-
-                        long pos = fchan.position();
-                        fin.close();
-                        FileOutputStream fout = new FileOutputStream(f);
-                        FileChannel fchanOut = fout.getChannel();
-                        fchanOut.truncate(pos);
-                        fchanOut.close();
-                        fout.close();
-                        truncated = true;
-                        break;
-                    }
-                }
-            } catch (EOFException eof) {
-                // expected in some cases - see comments in try block
-            } finally {
-                fchan.close();
-                fin.close();
-            }
-            if (truncated == true) {
-                break;
-            }
-        }
-        if (truncated == false) {
-            // not able to truncate the log
-            LOG.error("Not able to truncate the log zxid 0x"
-                    + Long.toHexString(finalZxid));
-            System.exit(13);
-        }
-
-    }
-
-    public void snapshot(BinaryOutputArchive oa) throws IOException,
-            InterruptedException {
-        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(
-                sessionsWithTimeouts);
-        oa.writeInt(sessSnap.size(), "count");
-        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
-            oa.writeLong(entry.getKey().longValue(), "id");
-            oa.writeInt(entry.getValue().intValue(), "timeout");
-        }
-        dataTree.serialize(oa, "tree");
-    }
-
-    public void snapshot() throws InterruptedException {
-        long lastZxid = dataTree.lastProcessedZxid;
-        ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-                "Snapshotting: zxid 0x" + Long.toHexString(lastZxid));
+    public void takeSnapshot(){
         try {
-            File f = new File(dataDir, "snapshot." + Long.toHexString(lastZxid));
-            OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(f));
-            try {
-                BinaryOutputArchive oa = BinaryOutputArchive.getArchive(sessOS);
-                snapshot(oa);
-                sessOS.flush();
-            } finally {
-                sessOS.close();
-            }
-            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-                    "Snapshotting finished: zxid 0x" + Long.toHexString(lastZxid));
+            txnLogFactory.save(dataTree, sessionsWithTimeouts);
         } catch (IOException e) {
             LOG.error("Severe error, exiting",e);
             // This is a severe error that we cannot recover from,
@@ -629,6 +247,18 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
+    public void serializeSnapshot(OutputArchive oa) throws IOException,
+            InterruptedException {
+        SerializeUtils.serializeSnapshot(dataTree, oa, sessionsWithTimeouts);
+    }
+
+    public void deserializeSnapshot(InputArchive ia) throws IOException {
+        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
+        dataTree = treeBuilder.build();
+
+        SerializeUtils.deserializeSnapshot(dataTree,ia,sessionsWithTimeouts);
+    }
+
     /**
      * This should be called from a synchronized block on this!
      */
@@ -644,10 +274,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return System.currentTimeMillis();
     }
 
-    static String getLogName(long zxid) {
-        return "log." + Long.toHexString(zxid);
-    }
-
     public void closeSession(long sessionId) throws InterruptedException {
         ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                  "ZooKeeperServer --- Session to be closed: 0x"
@@ -691,14 +317,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     public void startup() throws IOException, InterruptedException {
-        if (dataDir == null || !dataDir.isDirectory()) {
-            throw new IOException("data directory does not exist: " + dataDir);
-        }
-        if (dataLogDir == null || !dataLogDir.isDirectory()) {
-            throw new IOException("data log directory does not exist: "
-                    + dataLogDir);
-        }
-
         if (dataTree == null) {
             loadData();
         }
@@ -916,14 +534,42 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return serverCnxnFactory;
     }
 
+    /**
+     * return the last proceesed id from the 
+     * datatree
+     */
     public long getLastProcessedZxid() {
         return dataTree.lastProcessedZxid;
     }
 
+    /**
+     * return the outstanding requests
+     * in the queue, which havent been 
+     * processed yet
+     */
     public long getOutstandingRequests() {
         return getInProcess();
     }
 
+    /**
+     * trunccate the log to get in sync with others 
+     * if in a quorum
+     * @param zxid the zxid that it needs to get in sync
+     * with others
+     * @throws IOException
+     */
+    public void truncateLog(long zxid) throws IOException {
+        this.txnLogFactory.truncateLog(zxid);
+    }
+    
+    /**
+     * the snapshot and logwriter for this instance
+     * @return
+     */
+    public FileTxnSnapLog getLogWriter() {
+        return this.txnLogFactory;
+    }
+    
     public int getTickTime() {
         return tickTime;
     }
@@ -939,41 +585,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public void setTreeBuilder(DataTreeBuilder treeBuilder) {
         this.treeBuilder = treeBuilder;
     }
-
-    /**
-     * Gets directory for storing the snapshot
-     */
-    public File getDataDir() {
-        return dataDir;
-    }
-
-    /**
-     * Sets directory for storing the snapshot
-     */
-    public void setDataDir(File dataDir) throws IOException {
-        this.dataDir = dataDir;
-        if (!dataDir.isDirectory()) {
-            throw new IOException("data directory does not exist");
-        }
-    }
-
-    /**
-     * Gets directoy for storing the log tnxns
-     */
-    public File getDataLogDir() {
-        return dataLogDir;
-    }
-
-    /**
-     * Sets directoy for storing the log tnxns
-     */
-    public void setDataLogDir(File dataLogDir) throws IOException {
-        this.dataLogDir = dataLogDir;
-        if (!dataLogDir.isDirectory()) {
-            throw new IOException("data log directory does not exist");
-        }
-    }
-
+    
     public int getClientPort() {
         return clientPort;
     }
@@ -981,4 +593,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public void setClientPort(int clientPort) {
         this.clientPort = clientPort;
     }
+    
+    public void setTxnLogFactory(FileTxnSnapLog txnLog) {
+        this.txnLogFactory = txnLog;
+    }
+    
+    public FileTxnSnapLog getTxnLogFactory() {
+        return this.txnLogFactory;
+    }
 }

+ 10 - 4
src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java

@@ -22,12 +22,14 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
 
 /**
  * This class starts and runs a standalone ZooKeeperServer.
  */
 public class ZooKeeperServerMain {
-    
+
     private static final Logger LOG = Logger.getLogger(ZooKeeperServerMain.class);
 
     /*
@@ -43,11 +45,15 @@ public class ZooKeeperServerMain {
             }
 
             public ZooKeeperServer createServer() throws IOException {
+                // create a file logger url from the command line args
                 ZooKeeperServer zks = new ZooKeeperServer();
-                zks.setDataDir(new File(ServerConfig.getDataDir()));
-                zks.setDataLogDir(new File(ServerConfig.getDataLogDir()));
                 zks.setClientPort(ServerConfig.getClientPort());
-                return zks;
+
+               FileTxnSnapLog ftxn = new FileTxnSnapLog(new 
+                       File(ServerConfig.getDataLogDir()),
+                        new File(ServerConfig.getDataDir()));
+               zks.setTxnLogFactory(ftxn);
+               return zks;
             }
         });
     }

+ 134 - 0
src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java

@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.util.SerializeUtils;
+
+/**
+ * This class implements the snapshot interface.
+ * it is responsible for storing, serializing
+ * and deserializing the right snapshot.
+ * and provides access to the snapshots.
+ */
+public class FileSnap implements SnapShot {
+    File snapDir;
+    private static final int VERSION=2;
+    private static final long dbId=-1;
+    public final static int MAGIC = ByteBuffer.wrap("AK47".getBytes()).getInt();
+    public FileSnap(File snapDir) {
+        this.snapDir = snapDir;
+    }
+    
+    /**
+     * deserialize a data tree from the most recent snapshot
+     * @return the zxid of the snapshot
+     */
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File snap = findMostRecentSnapshot();
+        if (snap == null) {
+            return -1L;
+        }
+        InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
+        InputArchive ia=BinaryInputArchive.getArchive(snapIS);
+        deserialize(dt,sessions,ia);
+        snapIS.close();
+        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
+        return dt.lastProcessedZxid;
+    }
+    
+    /**
+     * deserialize the datatree from an inputarchive
+     * @param dt the datatree to be serialized into
+     * @param sessions the sessions to be filled up
+     * @param ia the input archive to restore from
+     * @throws IOException
+     */
+    protected void deserialize(DataTree dt, Map<Long, Integer> sessions,
+            InputArchive ia) throws IOException {
+        FileHeader header = new FileHeader();
+        header.deserialize(ia, "fileheader");
+        SerializeUtils.deserializeSnapshot(dt,ia,sessions);
+    }
+    
+    /**
+     * find the most recent snapshot in the database.
+     * @return the file containing the most recent snapshot
+     */
+    public File findMostRecentSnapshot() throws IOException {
+        List<File> files = Util.sortDataDir(snapDir.listFiles(), "snapshot", false);
+        for (File f : files) {
+            if(Util.isValidSnapshot(f))
+                return f;
+        }
+        return null;
+    }
+
+    /**
+     * serialize the datatree and sessions
+     * @param dt the datatree to be serialized
+     * @param sessions the sessions to be serialized
+     * @param oa the output archive to serialize into
+     * @param header the header of this snapshot
+     * @throws IOException
+     */
+    protected void serialize(DataTree dt,Map<Long, Integer> sessions,
+            OutputArchive oa, FileHeader header) throws IOException {
+        // this is really a programmatic error and not something that can 
+        // happen at runtime
+        if(header==null)
+            throw new IllegalStateException(
+                    "Snapshot's not open for writing: uninitialized header");
+        header.serialize(oa, "fileheader");
+        SerializeUtils.serializeSnapshot(dt,oa,sessions);
+    }
+    
+    /**
+     * serialize the datatree and session into the file snapshot
+     * @param dt the datatree to be serialized
+     * @param sessions the sessions to be serialized
+     * @param snapShot the file to store snapshot into
+     */
+    public void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
+            throws IOException {
+        OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
+        OutputArchive oa = BinaryOutputArchive.getArchive(sessOS);
+        FileHeader header = new FileHeader(MAGIC, VERSION, dbId);
+        serialize(dt,sessions,oa, header);
+        sessOS.flush();
+        sessOS.close();
+    }
+   
+ }

+ 480 - 0
src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java

@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.persistence;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * This class implements the TxnLog interface. It provides api's
+ * to access the txnlogs and add entries to it.
+ *
+ */
+public class FileTxnLog implements TxnLog {
+    long lastZxidSeen;
+    volatile FileOutputStream logStream = null;
+    volatile OutputArchive oa;
+    
+    File logDir;
+    public final static int MAGIC = ByteBuffer.wrap("AK47".getBytes()).getInt();
+    public final static int VERSION = 2;
+    private boolean forceSync = true;
+    long dbId;
+    private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();
+    static long preAllocSize =  65536 * 1024; 
+    long currentSize;
+    File logFileWrite = null;
+    
+    private static final Logger LOG = Logger.getLogger(FileTxnLog.class);
+  
+    /**
+     * constructor for FileTxnLog. Take the directory
+     * where the txnlogs are stored
+     * @param logDir the directory where the txnlogs are stored
+     */
+    public FileTxnLog(File logDir) {
+        this.logDir = logDir;
+        forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals(
+            "no");
+        String size = System.getProperty("zookeeper.preAllocSize");
+        if (size != null) {
+            try {
+                preAllocSize = Long.parseLong(size) * 1024;
+            } catch (NumberFormatException e) {
+                LOG.warn(size + " is not a valid value for preAllocSize");
+            }
+        }
+    }
+    
+    /**
+     * method to allow setting preallocate size
+     * of log file to pad the file.
+     * @param size the size to set to
+     */
+    public static void setPreallocSize(long size) {
+        preAllocSize = size;
+    }
+    
+    /**
+     * creates a checksum alogrithm to be used
+     * @return the checksum used for this txnlog
+     */
+    protected Checksum makeChecksumAlgorithm(){
+        return new Adler32();
+    }
+
+
+    /**
+     * rollover the current log file to a new one.
+     */
+    public void rollLog() {
+        this.logStream = null;
+        oa = null;
+    }
+
+    /**
+     * append an entry to the transaction log
+     * @param hdr the header of the transaction
+     * @param txn the transaction part of the entry
+     */
+    public synchronized void append(TxnHeader hdr, Record txn) 
+        throws IOException {
+        if (hdr != null) {
+            if (hdr.getZxid() <= lastZxidSeen) {
+                LOG.warn("Current zxid " + hdr.getZxid()
+                        + " is <= " + lastZxidSeen + " for "
+                        + hdr.getType());
+            }
+           if (logStream==null) {
+               logFileWrite = new File(logDir, ("log." + 
+                       Long.toHexString(hdr.getZxid())));
+               logStream=new FileOutputStream(logFileWrite);
+               oa = BinaryOutputArchive.getArchive(logStream);
+               FileHeader fhdr = new FileHeader(MAGIC,VERSION, dbId);
+               fhdr.serialize(oa, "fileheader");
+               currentSize = logStream.getChannel().position();
+               streamsToFlush.add(logStream);
+            }
+            padFile(logStream);
+            byte[] buf = Util.marshallTxnEntry(hdr, txn);
+            if (buf == null || buf.length == 0) {
+                throw new IOException("Faulty serialization for header " +
+                		"and txn");
+            }
+            Checksum crc = makeChecksumAlgorithm();
+            crc.update(buf, 0, buf.length);
+            oa.writeLong(crc.getValue(), "txnEntryCRC");
+            Util.writeTxnBytes(oa, buf);
+        }    
+    }
+    
+    /**
+     * pad the current file to increase its size
+     * @param out the outputstream to be padded
+     * @throws IOException
+     */
+    private void padFile(FileOutputStream out) throws IOException {
+        currentSize = Util.padLogFile(out, currentSize, preAllocSize);
+    }
+    
+    /**
+     * Find the log file that starts at, or just before, the snapshot. Return
+     * this and all subsequent logs. Results are ordered by zxid of file,
+     * ascending order.
+     * @param logDirList array of files
+     * @param snapshotZxid return files at, or before this zxid
+     * @return
+     */
+    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
+        List<File> files = Util.sortDataDir(logDirList, "log", true);
+        long logZxid = 0;
+        // Find the log file that starts before or at the same time as the
+        // zxid of the snapshot
+        for (File f : files) {
+            long fzxid = Util.getZxidFromName(f.getName(), "log");
+            if (fzxid > snapshotZxid) {
+                continue;
+            }
+            // the files 
+            // are sorted with zxid's
+            if (fzxid > logZxid) {
+                logZxid = fzxid;
+            }
+        }
+        List<File> v=new ArrayList<File>(5);
+        for (File f : files) {
+            long fzxid = Util.getZxidFromName(f.getName(), "log");
+            if (fzxid < logZxid) {
+                continue;
+            }
+            v.add(f);
+        }
+        return v.toArray(new File[0]);
+    
+    }
+    
+    /**
+     * get the last zxid that was logged in the transaction logs
+     * @return the last zxid logged in the transaction logs
+     */
+    public long getLastLoggedZxid() {
+        File[] files = getLogFiles(logDir.listFiles(), 0);
+        long maxLog=files.length>0?
+                Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
+        
+        // if a log file is more recent we must scan it to find 
+        // the highest zxid
+        long zxid = maxLog;
+        FileOutputStream logStream = null;
+        try {
+            FileTxnLog txn = new FileTxnLog(logDir);
+            TxnIterator itr = txn.read(maxLog);
+            while (true) {
+                if(!itr.next())
+                    break;
+                TxnHeader hdr = itr.getHeader();
+                zxid = hdr.getZxid();
+            }
+        } catch (IOException e) {
+            LOG.warn("Unexpected exception", e);
+        } finally {
+            if (logStream != null)
+                try {
+                    logStream.close();
+                } catch(IOException io){}
+        }
+        return zxid;
+    }
+    
+    /**
+     * commit the logs. make sure that evertyhing hits the 
+     * disk
+     */
+    public synchronized void commit() throws IOException {
+        for (FileOutputStream log : streamsToFlush) {
+            log.flush();
+            if (forceSync) {
+                log.getChannel().force(false);
+            }
+        }
+        while (streamsToFlush.size() > 1) {
+            streamsToFlush.removeFirst().close();
+        }
+    }
+    
+    /**
+     * start reading all the transactions from the given zxid
+     * @param zxid the zxid to start reading transactions from
+     * @return returns an iterator to iterate through the transaction
+     * logs
+     */
+    public TxnIterator read(long zxid) throws IOException {
+        return new FileTxnIterator(logDir, zxid);
+    }   
+    
+    /**
+     * truncate the current transaction logs
+     * @param zxid the zxid to truncate the logs to
+     * @return true if successful false if not
+     */
+    public boolean truncate(long zxid) throws IOException {
+        FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid);
+        FileInputStream input = itr.inputStream;
+        long pos = input.getChannel().position();
+        // now, truncate at the current position
+        RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
+        raf.setLength(pos);
+        raf.close();
+        while(itr.goToNextLog()) {
+            itr.logFile.delete();
+        }
+        return true;
+    }
+    
+    /**
+     * read the header of the transaction file
+     * @param file the transaction file to read
+     * @return header that was read fomr the file
+     * @throws IOException
+     */
+    private static FileHeader readHeader(File file) throws IOException {
+        InputStream is =null;
+        try{
+            is = new BufferedInputStream(new FileInputStream(file));
+            InputArchive ia=BinaryInputArchive.getArchive(is);
+            FileHeader hdr = new FileHeader();
+            hdr.deserialize(ia, "fileheader");
+            return hdr;
+         }finally{
+             try{
+                 if(is != null) is.close();
+             }catch(IOException e){
+             }
+         }        
+    }
+    
+    /**
+     * the dbid of this transaction database
+     * @return the dbid of this database
+     */
+    public long getDbId() throws IOException {
+        FileTxnIterator itr = new FileTxnIterator(logDir, 0);
+        FileHeader fh=readHeader(itr.logFile);
+        itr.close();
+        if(fh==null)
+            throw new IOException("Unsupported Format.");
+        return fh.getDbid();
+    }
+    
+    /**
+     * this class implements the txnlog iterator interface 
+     * which is used for reading the transaction logs 
+     */
+    public static class FileTxnIterator implements TxnLog.TxnIterator {
+        File logDir;
+        long zxid;
+        TxnHeader hdr;
+        Record record;
+        File logFile;
+        InputArchive ia;
+        static final String CRC_ERROR="CRC check failed";
+        FileInputStream inputStream=null;
+        //stored files is the list of files greater than 
+        //the zxid we are looking for.
+        private ArrayList<File> storedFiles;
+        
+        /**
+         * create an iterator over a transaction database directory
+         * @param logDir the transaction database directory
+         * @param zxid the zxid to start reading from
+         * @throws IOException
+         */
+        public FileTxnIterator(File logDir, long zxid) throws IOException {
+          this.logDir = logDir;
+          this.zxid = zxid;
+          init();
+        }
+        
+        /**
+         * initialize to the zxid specified
+         * this is inclusive of the zxid 
+         * @throws IOException
+         */
+        void init() throws IOException {
+            storedFiles = new ArrayList<File>();
+            List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
+            for (File f: files) {
+                if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
+                    storedFiles.add(f);
+                }
+                // add the last logfile that is less than the zxid
+                else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
+                    storedFiles.add(f);
+                    break;
+                }
+            }
+            goToNextLog();
+            if (!next())
+                return;
+            while (hdr.getZxid() < zxid) {
+                next();
+            }
+        }
+        
+        /**
+         * go to the next logfile 
+         * @return true if there is one and false if there is no 
+         * new file to be read
+         * @throws IOException
+         */
+        private boolean goToNextLog() throws IOException {
+            if (storedFiles.size() > 0) {
+                this.logFile = storedFiles.remove(storedFiles.size()-1);
+                ia = createInputArchive(this.logFile);
+                return true;
+            }
+            return false;
+        }
+        
+        /**
+         * read the header fomr the inputarchive
+         * @param ia the inputarchive to be read from
+         * @param is the inputstream 
+         * @throws IOException
+         */
+        protected void inStreamCreated(InputArchive ia, FileInputStream is) 
+            throws IOException{
+            FileHeader header= new FileHeader();
+            header.deserialize(ia, "fileheader");
+        }
+        
+        /**
+         * Invoked to indicate that the input stream has been created.
+         * @param ia input archive
+         * @param is file input stream associated with the input archive.
+         * @throws IOException
+         **/
+        protected InputArchive createInputArchive(File logFile) throws IOException {
+            if(inputStream==null){
+                inputStream= new FileInputStream(logFile);
+                LOG.info("Created new input stream " + logFile);
+                ia  = BinaryInputArchive.getArchive(new BufferedInputStream(inputStream));
+                inStreamCreated(ia,inputStream);
+                LOG.info("created new input archive " + logFile);
+            }
+            return ia;
+        }
+        
+        /**
+         * create a checksum algorithm 
+         * @return the checksum algorithm
+         */
+        protected Checksum makeChecksumAlgorithm(){
+            return new Adler32();
+        }
+        
+        /**
+         * the iterator that moves to the next transaction
+         * @return true if there is more transactions to be read
+         * false if not.
+         */
+        public boolean next() throws IOException {
+            if (ia == null) {
+                return false;
+            }
+            try {
+                long crcValue = ia.readLong("crcvalue");
+                byte[] bytes = Util.readTxnBytes(ia);
+                // Since we preallocate, we define EOF to be an
+                if (bytes == null || bytes.length==0)
+                   throw new EOFException("Failed to read"); 
+                // EOF or corrupted record
+                // validate CRC
+                Checksum crc = makeChecksumAlgorithm();
+                crc.update(bytes, 0, bytes.length);
+                if (crcValue != crc.getValue()) 
+                    throw new IOException(CRC_ERROR);
+                if (bytes == null || bytes.length == 0)
+                    return false;
+                InputArchive iab = BinaryInputArchive
+                                    .getArchive(new ByteArrayInputStream(bytes));
+                hdr = new TxnHeader();
+                record = SerializeUtils.deserializeTxn(iab, hdr);
+            } catch (EOFException e) {
+                LOG.info("EOF excepton " + e);
+                inputStream.close();
+                inputStream = null;
+                // thsi means that the file has ended 
+                // we shoud go to the next file
+                if (!goToNextLog()) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        
+        /**
+         * reutrn the current header 
+         * @return the current header that 
+         * is read
+         */
+        public TxnHeader getHeader() {
+            return hdr;
+        }
+
+        /**
+         * return the current transaction
+         * @return the current transaction
+         * that is read
+         */
+        public Record getTxn() {
+            return record;
+        }
+        
+        /**
+         * close the iterator 
+         * and release the resources.
+         */
+        public void close() throws IOException {
+            inputStream.close();
+        }
+    }
+
+}

+ 236 - 0
src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * This is a helper class 
+ * above the implementations 
+ * of txnlog and snapshot 
+ * classes
+ */
+public class FileTxnSnapLog {
+    //the direcotry containing the 
+    //the transaction logs
+    File dataDir; 
+    //the directory containing the 
+    //the snapshot directory
+    File snapDir;
+    TxnLog txnLog;
+    SnapShot snapLog;
+    
+    private static final Logger LOG = Logger.getLogger(FileTxnSnapLog.class);
+    
+    /**
+     * This listener helps
+     * the external apis calling
+     * restore to gather information
+     * while the data is being 
+     * restored.
+     */
+    public interface PlayBackListener {
+        void onTxnLoaded(TxnHeader hdr, Record rec);
+    }
+    
+    /**
+     * the constructor which takes the datadir and 
+     * snapdir.
+     * @param dataDir the trasaction directory
+     * @param snapDir the snapshot directory
+     */
+    public FileTxnSnapLog(File dataDir, File snapDir) {
+        this.dataDir = dataDir;
+        this.snapDir = snapDir;
+        txnLog = new FileTxnLog(dataDir);
+        snapLog = new FileSnap(snapDir);
+    }
+    
+    /**
+     * this function restors the server 
+     * database after reading from the 
+     * snapshots and transaction logs
+     * @param dt the datatree to be restored
+     * @param sessions the sessions to be restored
+     * @param listener the playback listener to run on the 
+     * database restoration
+     * @return the highest zxid restored
+     * @throws IOException
+     */
+    public long restore(DataTree dt, Map<Long, Integer> sessions, 
+            PlayBackListener listener) throws IOException {
+        snapLog.deserialize(dt, sessions);
+        FileTxnLog txnLog = new FileTxnLog(dataDir);
+        TxnIterator itr = txnLog.read(dt.lastProcessedZxid);
+        long highestZxid = dt.lastProcessedZxid;
+        TxnHeader hdr;
+        while (true) {
+            // iterator points to 
+            // the first valid txn when initialized
+            hdr = itr.getHeader();
+            if (hdr == null) {
+                //empty logs 
+                return dt.lastProcessedZxid;
+            }
+            if (hdr.getZxid() <= highestZxid && highestZxid != 0) {
+                LOG.error(highestZxid + "(higestZxid) >= "
+                        + hdr.getZxid() + "(next log) for type "
+                        + hdr.getType());
+            } else {
+                highestZxid = hdr.getZxid();
+            }
+            processTransaction(hdr,dt,sessions, itr.getTxn());
+            if (!itr.next()) 
+                break;
+        }
+        return highestZxid;
+    }
+    
+    /**
+     * process the transaction on the datatree
+     * @param hdr the hdr of the transaction
+     * @param dt the datatree to apply transaction to
+     * @param sessions the sessions to be restored
+     * @param txn the transaction to be applied
+     */
+    private void processTransaction(TxnHeader hdr,DataTree dt,
+            Map<Long, Integer> sessions, Record txn){
+        switch (hdr.getType()) {
+        case OpCode.createSession:
+            sessions.put(hdr.getClientId(),
+                    ((CreateSessionTxn) txn).getTimeOut());
+            ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
+                    "playLog --- create session in log: "
+                            + Long.toHexString(hdr.getClientId())
+                            + " with timeout: "
+                            + ((CreateSessionTxn) txn).getTimeOut());
+            // give dataTree a chance to sync its lastProcessedZxid
+            dt.processTxn(hdr, txn);
+            break;
+        case OpCode.closeSession:
+            sessions.remove(hdr.getClientId());
+            ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
+                    "playLog --- close session in log: "
+                            + Long.toHexString(hdr.getClientId()));
+            dt.processTxn(hdr, txn);
+            break;
+        default:
+            dt.processTxn(hdr, txn);
+        }        
+    }
+    
+    /**
+     * the last logged zxid on the transaction logs
+     * @return the last logged zxid
+     */
+    public long getLastLoggedZxid() {
+        FileTxnLog txnLog = new FileTxnLog(dataDir);
+        return txnLog.getLastLoggedZxid();
+    }
+
+    /**
+     * save the datatree and the sessions into a snapshot
+     * @param dataTree the datatree to be serialized onto disk
+     * @param sessionsWithTimeouts the sesssion timeouts to be
+     * serialized onto disk
+     * @throws IOException
+     */
+    public void save(DataTree dataTree,
+            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts) throws IOException {
+        long lastZxid = dataTree.lastProcessedZxid;
+        LOG.info("Snapshotting: " + Long.toHexString(lastZxid));
+        File snapshot=new File(
+                snapDir, Util.makeSnapshotName(lastZxid));
+        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot);
+        
+    }
+
+    /**
+     * truncate the transaction logs the zxid
+     * specified
+     * @param zxid the zxid to truncate the logs to
+     * @return true if able to truncate the log, false if not
+     * @throws IOException
+     */
+    public boolean truncateLog(long zxid) throws IOException {
+        FileTxnLog txnLog = new FileTxnLog(dataDir);
+        return txnLog.truncate(zxid);
+    }
+    
+    /**
+     * the most recent snapshot in the snapshot
+     * directory
+     * @return the file that contains the most 
+     * recent snapshot
+     * @throws IOException
+     */
+    public File findMostRecentSnapshot() throws IOException {
+        FileSnap snaplog = new FileSnap(snapDir);
+        return snaplog.findMostRecentSnapshot();
+    }
+
+    /**
+     * get the snapshot logs that are greater than
+     * the given zxid 
+     * @param zxid the zxid that contains logs greater than 
+     * zxid
+     * @return
+     */
+    public File[] getSnapshotLogs(long zxid) {
+        return FileTxnLog.getLogFiles(dataDir.listFiles(), zxid);
+    }
+
+    /**
+     * append the request to the transaction logs
+     * @param si the request to be appended
+     * @throws IOException
+     */
+    public void append(Request si) throws IOException {
+        txnLog.append(si.hdr, si.txn);
+    }
+
+    /**
+     * commit the transaction of logs
+     * @throws IOException
+     */
+    public void commit() throws IOException {
+        txnLog.commit();
+    }
+
+    /**
+     * roll the transaction logs
+     */
+    public void rollLog() {
+        txnLog.rollLog();
+    }
+    
+}

+ 59 - 0
src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.zookeeper.server.DataTree;
+
+/**
+ * snapshot interface for the persistence layer.
+ * implement this interface for implementing 
+ * snapshots.
+ */
+public interface SnapShot {
+    
+    /**
+     * deserialize a data tree from the last valid snapshot and 
+     * return the last zxid that was deserialized
+     * @param dt the datatree to be deserialized into
+     * @param sessions the sessions to be deserialized into
+     * @return the last zxid that was deserialized from the snapshot
+     * @throws IOException
+     */
+    long deserialize(DataTree dt, Map<Long, Integer> sessions) 
+        throws IOException;
+    
+    /**
+     * persist the datatree and the sessions into a persistence storage
+     * @param dt the datatree to be serialized
+     * @param sessions 
+     * @throws IOException
+     */
+    void serialize(DataTree dt, Map<Long, Integer> sessions, File name) 
+        throws IOException;
+    
+    /**
+     * find the most recent snapshot file
+     * @return the most recent snapshot file
+     * @throws IOException
+     */
+    File findMostRecentSnapshot() throws IOException;
+} 

+ 115 - 0
src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java

@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.IOException;
+
+import org.apache.jute.Record;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * Interface for reading transaction logs.
+ *
+ */
+public interface TxnLog {
+    
+    /**
+     * roll the current
+     * log being appended to
+     */
+    void rollLog();
+    /**
+     * Append a request to the transaction log
+     * @param hdr the transaction header
+     * @param r the transaction itself
+     * @throws IOException
+     */
+    void append(TxnHeader hdr, Record r) throws IOException;
+
+    /**
+     * Start reading the transaction logs
+     * from a given zxid
+     * @param zxid
+     * @return returns an iterator to read the 
+     * next transaction in the logs.
+     * @throws IOException
+     */
+    TxnIterator read(long zxid) throws IOException;
+    
+    /**
+     * the last zxid of the logged transactions.
+     * @return the last zxid of the logged transactions.
+     * @throws IOException
+     */
+    long getLastLoggedZxid() throws IOException;
+    
+    /**
+     * truncate the log to get in sync with the 
+     * leader.
+     * @param zxid the zxid to truncate at.
+     * @throws IOException 
+     */
+    boolean truncate(long zxid) throws IOException;
+    
+    /**
+     * the dbid for this transaction log. 
+     * @return the dbid for this transaction log.
+     * @throws IOException
+     */
+    long getDbId() throws IOException;
+    
+    /**
+     * commmit the trasaction and make sure
+     * they are persisted
+     * @throws IOException
+     */
+    void commit() throws IOException;
+   
+    /**
+     * an iterating interface for reading 
+     * transaction logs. 
+     */
+    public interface TxnIterator {
+        /**
+         * return the transaction header.
+         * @return return the transaction header.
+         */
+        TxnHeader getHeader();
+        
+        /**
+         * return the transaction record.
+         * @return return the transaction record.
+         */
+        Record getTxn();
+     
+        /**
+         * go to the next transaction record.
+         * @throws IOException
+         */
+        boolean next() throws IOException;
+        
+        /**
+         * close files and release the 
+         * resources
+         * @throws IOException
+         */
+        void close() throws IOException;
+    }
+}
+

+ 336 - 0
src/java/main/org/apache/zookeeper/server/persistence/Util.java

@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * A collection of utility methods for dealing with file name parsing, 
+ * low level I/O file operations and marshalling/unmarshalling.
+ */
+public class Util {
+    private static final Logger LOG = Logger.getLogger(Util.class);
+    private static final String SNAP_DIR="snapDir";
+    private static final String LOG_DIR="logDir";
+    private static final String DB_FORMAT_CONV="dbFormatConversion";
+    private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
+    
+    public static String makeURIString(String dataDir, String dataLogDir, 
+            String convPolicy){
+        String uri="file:"+SNAP_DIR+"="+dataDir+";"+LOG_DIR+"="+dataLogDir;
+        if(convPolicy!=null)
+            uri+=";"+DB_FORMAT_CONV+"="+convPolicy;
+        return uri.replace('\\', '/');
+    }
+    /**
+     * Given two directory files the method returns a well-formed 
+     * logfile provider URI. This method is for backward compatibility with the
+     * existing code that only supports logfile persistence and expects these two
+     * parameters passed either on the command-line or in the configuration file.
+     * 
+     * @param dataDir snapshot directory
+     * @param dataLogDir transaction log directory
+     * @return logfile provider URI
+     */
+    public static URI makeFileLoggerURL(File dataDir, File dataLogDir){
+        return URI.create(makeURIString(dataDir.getPath(),dataLogDir.getPath(),null));
+    }
+    
+    public static URI makeFileLoggerURL(File dataDir, File dataLogDir,String convPolicy){
+        return URI.create(makeURIString(dataDir.getPath(),dataLogDir.getPath(),convPolicy));
+    }
+
+    /**
+     * Creates a valid transaction log file name. 
+     * 
+     * @param zxid used as a file name suffix (extention)
+     * @return file name
+     */
+    public static String makeLogName(long zxid) {
+        return "log." + Long.toHexString(zxid);
+    }
+
+    /**
+     * Creates a snapshot file name.
+     * 
+     * @param zxid used as a suffix
+     * @return file name
+     */
+    public static String makeSnapshotName(long zxid) {
+        return "snapshot." + Long.toHexString(zxid);
+    }
+    
+    /**
+     * Extracts snapshot directory property value from the container.
+     * 
+     * @param props properties container
+     * @return file representing the snapshot directory
+     */
+    public static File getSnapDir(Properties props){
+        return new File(props.getProperty(SNAP_DIR));
+    }
+
+    /**
+     * Extracts transaction log directory property value from the container.
+     * 
+     * @param props properties container
+     * @return file representing the txn log directory
+     */
+    public static File getLogDir(Properties props){
+        return new File(props.getProperty(LOG_DIR));
+    }
+    
+    /**
+     * Extracts the value of the dbFormatConversion attribute.
+     * 
+     * @param props properties container
+     * @return value of the dbFormatConversion attribute
+     */
+    public static String getFormatConversionPolicy(Properties props){
+        return props.getProperty(DB_FORMAT_CONV);
+    }
+    
+    /**
+     * The routine parses the scheme-specific part and returns the attributes 
+     * values (if any) as an instance of Properties.
+     * @param uri the persistence provider URI
+     * @return URI attributes
+     */
+    public static Properties parseUrl(URI uri){
+        Properties props=new Properties();        
+        for(String s: uri.getSchemeSpecificPart().split(";")){
+            String[] pair=s.split("=");
+            if(pair.length==2){
+                if(pair[0].equals(SNAP_DIR)){
+                    props.setProperty(SNAP_DIR,pair[1]);
+                }else if(pair[0].equals(LOG_DIR)){
+                    props.setProperty(LOG_DIR,pair[1]);                        
+                }else if(pair[0].equals(DB_FORMAT_CONV)){
+                    props.setProperty(DB_FORMAT_CONV,pair[1]);                        
+                }else{
+                    LOG.warn("Unknown parameter ["+pair[0]+"] ignored");
+                }
+            }
+        }
+        Set<String> pp=props.stringPropertyNames();
+        if(!pp.contains(SNAP_DIR) || !pp.contains(LOG_DIR)){
+            throw new IllegalArgumentException(
+                    "Both snapDir and logDir must be specified");
+        }
+        return props;
+    }
+
+    /**
+     * Extracts zxid from the file name. The file name should have been created
+     * using one of the {@link makeLogName} or {@link makeSnapshotName}.
+     * 
+     * @param name the file name to parse
+     * @param prefix the file name prefix (snapshot or log)
+     * @return zxid
+     */
+    public static long getZxidFromName(String name, String prefix) {
+        long zxid = -1;
+        String nameParts[] = name.split("\\.");
+        if (nameParts.length == 2 && nameParts[0].equals(prefix)) {
+            try {
+                zxid = Long.parseLong(nameParts[1], 16);
+            } catch (NumberFormatException e) {
+            }
+        }
+        return zxid;
+    }
+
+    /**
+     * Verifies that the file is a valid snapshot. Snapshot may be invalid if 
+     * it's incomplete as in a situation when the server dies while in the process
+     * of storing a snapshot. Any file that is not a snapshot is also 
+     * an invalid snapshot. 
+     * 
+     * @param f file to verify
+     * @return true if the snapshot is valid
+     * @throws IOException
+     */
+    public static boolean isValidSnapshot(File f) throws IOException {
+        if (f==null || Util.getZxidFromName(f.getName(), "snapshot") == -1)
+            return false;
+
+        // Check for a valid snapshot
+        RandomAccessFile raf = new RandomAccessFile(f, "r");
+        try {
+            raf.seek(raf.length() - 5);
+            byte bytes[] = new byte[5];
+            raf.read(bytes);
+            ByteBuffer bb = ByteBuffer.wrap(bytes);
+            int len = bb.getInt();
+            byte b = bb.get();
+            if (len != 1 || b != '/') {
+                LOG.info("Invalid snapshot " + f + " len = " + len
+                        + " byte = " + (b & 0xff));
+                return false;
+            }
+        } finally {
+            raf.close();
+        }
+
+        return true;
+    }
+
+    /**
+     * Grows the file to the specified number of bytes. This only happenes if 
+     * the current file position is sufficiently close (less than 4K) to end of 
+     * file. 
+     * 
+     * @param f output stream to pad
+     * @param currentSize application keeps track of the cuurent file size
+     * @param preAllocSize how many bytes to pad
+     * @return the new file size. It can be the same as currentSize if no
+     * padding was done.
+     * @throws IOException
+     */
+    public static long padLogFile(FileOutputStream f,long currentSize,
+            long preAllocSize) throws IOException{
+        long position = f.getChannel().position();
+        if (position + 4096 >= currentSize) {
+            currentSize = currentSize + preAllocSize;
+            fill.position(0);
+            f.getChannel().write(fill, currentSize-fill.remaining());
+        }
+        return currentSize;
+    }
+
+    /**
+     * Reads a transaction entry from the input archive.
+     * @param ia archive to read from
+     * @return null if the entry is corrupted or EOF has been reached; a buffer
+     * (possible empty) containing serialized transaction record.
+     * @throws IOException
+     */
+    public static byte[] readTxnBytes(InputArchive ia) throws IOException {
+        try{
+            byte[] bytes = ia.readBuffer("txtEntry");
+            // Since we preallocate, we define EOF to be an
+            // empty transaction
+            if (bytes.length == 0)
+                return bytes;
+            if (ia.readByte("EOF") != 'B') {
+                LOG.error("Last transaction was partial.");
+                return null;
+            }
+            return bytes;
+        }catch(EOFException e){}
+        return null;
+    }
+    
+
+    /**
+     * Serializes transaction header and transaction data into a byte buffer.
+     *  
+     * @param hdr transaction header
+     * @param txn transaction data
+     * @return serialized transaction record
+     * @throws IOException
+     */
+    public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn)
+            throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+
+        hdr.serialize(boa, "hdr");
+        if (txn != null) {
+            txn.serialize(boa, "txn");
+        }
+        return baos.toByteArray();
+    }
+
+    /**
+     * Write the serialized transaction record to the output archive.
+     *  
+     * @param oa output archive
+     * @param bytes serialized trasnaction record
+     * @throws IOException
+     */
+    public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
+            throws IOException {
+        oa.writeBuffer(bytes, "txnEntry");
+        oa.writeByte((byte) 0x42, "EOR"); // 'B'
+    }
+    
+    
+    /**
+     * Compare file file names of form "prefix.version". Sort order result
+     * returned in order of version.
+     */
+    private static class DataDirFileComparator implements Comparator<File> {
+        private String prefix;
+        private boolean ascending;
+        public DataDirFileComparator(String prefix, boolean ascending) {
+            this.prefix = prefix;
+            this.ascending = ascending;
+        }
+
+        public int compare(File o1, File o2) {
+            long z1 = Util.getZxidFromName(o1.getName(), prefix);
+            long z2 = Util.getZxidFromName(o2.getName(), prefix);
+            int result = z1 < z2 ? -1 : (z1 > z2 ? 1 : 0);
+            return ascending ? result : -result;
+        }
+    }
+    
+    /**
+     * Sort the list of files. Recency as determined by the version component
+     * of the file name.
+     *
+     * @param files array of files
+     * @param prefix files not matching this prefix are assumed to have a
+     * version = -1)
+     * @param ascending true sorted in ascending order, false results in
+     * descending order
+     * @return sorted input files
+     */
+    public static List<File> sortDataDir(File[] files, String prefix, boolean ascending)
+    {
+        if(files==null)
+            return new ArrayList<File>(0);
+        List<File> filelist = Arrays.asList(files);
+        Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
+        return filelist;
+    }
+    
+}

+ 13 - 7
src/java/main/org/apache/zookeeper/server/quorum/Follower.java

@@ -32,18 +32,17 @@ import java.util.HashMap;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.log4j.Logger;
-
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -174,7 +173,7 @@ public class Follower {
                 else if (qp.getType() == Leader.SNAP) {
                     LOG.info("Getting a snapshot from leader");
                     // The leader is going to dump the database
-                    zk.loadData(leaderIs);
+                    zk.deserializeSnapshot(leaderIs);
                     String signature = leaderIs.readString("signature");
                     if (!signature.equals("BenWasHere")) {
                         LOG.error("Missing signature. Got " + signature);
@@ -184,7 +183,14 @@ public class Follower {
                     //we need to truncate the log to the lastzxid of the leader
                     LOG.warn("Truncating log to get in sync with the leader 0x"
                             + Long.toHexString(qp.getZxid()));
-                    zk.truncateLog(qp.getZxid());
+                    boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
+                    if (!truncated) {
+                        // not able to truncate the log
+                        LOG.error("Not able to truncate the log "
+                                + Long.toHexString(qp.getZxid()));
+                        System.exit(13);
+                    }
+
                     zk.loadData();
                 }
                 else {
@@ -218,7 +224,7 @@ public class Follower {
                     TxnHeader hdr = new TxnHeader();
                     BinaryInputArchive ia = BinaryInputArchive
                             .getArchive(new ByteArrayInputStream(qp.getData()));
-                    Record txn = ZooKeeperServer.deserializeTxn(ia, hdr);
+                    Record txn = SerializeUtils.deserializeTxn(ia, hdr);
                     if (hdr.getZxid() != lastQueued + 1) {
                         LOG.warn("Got zxid 0x"
                                 + Long.toHexString(hdr.getZxid())
@@ -232,7 +238,7 @@ public class Follower {
                     zk.commit(qp.getZxid());
                     break;
                 case Leader.UPTODATE:
-                    zk.snapshot();
+                    zk.takeSnapshot();
                     self.cnxnFactory.setZooKeeperServer(zk);
                     break;
                 case Leader.REVALIDATE:

+ 4 - 5
src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java

@@ -29,15 +29,14 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.log4j.Logger;
-
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -138,7 +137,7 @@ public class FollowerHandler extends Thread {
                     .getArchive(new ByteArrayInputStream(p.getData()));
             TxnHeader hdr = new TxnHeader();
             try {
-                txn = ZooKeeperServer.deserializeTxn(ia, hdr);
+                txn = SerializeUtils.deserializeTxn(ia, hdr);
                 // mess = "transaction: " + txn.toString();
             } catch (IOException e) {
                 LOG.warn("Unexpected exception",e);
@@ -250,7 +249,7 @@ public class FollowerHandler extends Thread {
                         + " zxid of leader is 0x"
                         + Long.toHexString(leaderLastZxid));
                 // Dump data to follower
-                leader.zk.snapshot(oa);
+                leader.zk.serializeSnapshot(oa);
                 oa.writeString("BenWasHere", "signature");
             }
             bufferedOutput.flush();

+ 5 - 6
src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -18,21 +18,20 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.log4j.Logger;
-
 import org.apache.jute.Record;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -61,9 +60,9 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
      * @param dataDir
      * @throws IOException
      */
-    FollowerZooKeeperServer(File dataDir, File dataLogDir,
-            QuorumPeer self,DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, self.tickTime,treeBuilder);
+    FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory, self.tickTime,treeBuilder);
         this.self = self;
         this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
     }

+ 5 - 5
src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

@@ -18,7 +18,6 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.zookeeper.server.FinalRequestProcessor;
@@ -26,6 +25,7 @@ import org.apache.zookeeper.server.PrepRequestProcessor;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.SessionTrackerImpl;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
 /**
  * 
@@ -44,9 +44,9 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
      * @param dataDir
      * @throws IOException
      */
-    LeaderZooKeeperServer(File dataDir, File dataLogDir,
-            QuorumPeer self,DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, self.tickTime,treeBuilder);
+    LeaderZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory, self.tickTime,treeBuilder);
         this.self = self;
     }
 
@@ -84,4 +84,4 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
     public void setZxid(long zxid) {
         hzxid = zxid;
     }
-}
+}

+ 74 - 149
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -17,9 +17,7 @@
  */
 package org.apache.zookeeper.server.quorum;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
@@ -28,13 +26,18 @@ import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.InputArchive;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.txn.TxnHeader;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionAlg;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionPort;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getInitLimit;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServerId;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServers;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getSyncLimit;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getTickTime;
 
 /**
  * This class manages the quorum protocol. There are three states this server
@@ -232,16 +235,6 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         return myQuorumAddr;
     }
 
-    /**
-     * the directory where the snapshot is stored.
-     */
-    private File dataDir;
-
-    /**
-     * the directory where the logs are stored.
-     */
-    private File dataLogDir;
-
     private int electionType;
 
     Election electionAlg;
@@ -249,27 +242,41 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     int electionPort;
 
     NIOServerCnxn.Factory cnxnFactory;
+    private FileTxnSnapLog logFactory = null;
 
-
+    
     public QuorumPeer() {
         super("QuorumPeer");
     }
     
-    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
-            File dataLogDir, int electionType, int electionPort,long myid, int tickTime,
+    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir,
+            int electionAlg, int electionPort,long myid, int tickTime,
             int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        this();
-        this.electionType = electionType;
+        super("QuorumPeer");
         this.cnxnFactory = cnxnFactory;
         this.quorumPeers = quorumPeers;
-        this.dataDir = dataDir;
         this.electionPort = electionPort;
-        this.dataLogDir = dataLogDir;
         this.myid = myid;
         this.tickTime = tickTime;
         this.initLimit = initLimit;
         this.syncLimit = syncLimit;        
+        this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
+        QuorumStats.getInstance().setStatsProvider(this);
+    }
+    
+    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, FileTxnSnapLog logFactory,
+            int electionAlg, int electionPort,long myid, int tickTime,
+            int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
         
+        super("QuorumPeer");
+        this.cnxnFactory = cnxnFactory;
+        this.quorumPeers = quorumPeers;
+        this.electionPort = electionPort;
+        this.myid = myid;
+        this.tickTime = tickTime;
+        this.initLimit = initLimit;
+        this.syncLimit = syncLimit;        
+        this.logFactory=logFactory;
         QuorumStats.getInstance().setStatsProvider(this);
     }
 
@@ -306,35 +313,35 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
             }
         }
         this.electionAlg = createElectionAlgorithm(electionType);
-       
     }
     
     /**
      * This constructor is only used by the existing unit test code.
+     * It defaults to FileLogProvider persistence provider.
      */
-    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
-            File dataLogDir, int clientPort, int electionAlg, int electionPort,
+    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File snapDir,
+            File logDir, int clientPort, int electionAlg, int electionPort,
             long myid, int tickTime, int initLimit, int syncLimit) throws IOException {
-        this(quorumPeers,dataDir,dataLogDir,electionAlg,electionPort,myid,tickTime,
-                initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort));
+        this(quorumPeers,
+                new FileTxnSnapLog(snapDir,logDir),
+                electionAlg,electionPort,myid,tickTime,initLimit,syncLimit,
+                new NIOServerCnxn.Factory(clientPort));
+    }
+    
+    public long getLastLoggedZxid(){
+        return logFactory.getLastLoggedZxid();
     }
-
     public Follower follower;
     public Leader leader;
 
-    private int clientPort;
-
-    protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException {
-        FollowerZooKeeperServer zks = new FollowerZooKeeperServer(dataDir, dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder());
-        zks.setClientPort(clientPort);
-        return new Follower(this, zks);
+    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+        return new Follower(this, new FollowerZooKeeperServer(logFactory, 
+                this,new ZooKeeperServer.BasicDataTreeBuilder()));
     }
-
-    protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException {
-        LeaderZooKeeperServer zks = new LeaderZooKeeperServer(dataDir, dataLogDir,
-                this,new ZooKeeperServer.BasicDataTreeBuilder());
-        zks.setClientPort(clientPort);
-        return new Leader(this, zks);
+     
+    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+        return new Leader(this, new LeaderZooKeeperServer(logFactory,
+                this,new ZooKeeperServer.BasicDataTreeBuilder()));
     }
 
     private Election createElectionAlgorithm(int electionAlgorithm){
@@ -400,7 +407,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
             case FOLLOWING:
                 try {
                     LOG.info("FOLLOWING");
-                    setFollower(makeFollower(dataDir,dataLogDir));
+                    setFollower(makeFollower(logFactory));
                     follower.followLeader();
                 } catch (Exception e) {
                     LOG.warn("Unexpected exception",e);
@@ -413,7 +420,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
             case LEADING:
                 LOG.info("LEADING");
                 try {
-                    setLeader(makeLeader(dataDir,dataLogDir));
+                    setLeader(makeLeader(logFactory));
                     leader.lead();
                     setLeader(null);
                 } catch (Exception e) {
@@ -443,68 +450,6 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         udpSocket.close();
     }
 
-    long getLastLoggedZxid() {
-        File[] list = dataLogDir.listFiles();
-        if (list == null) {
-            return 0;
-        }
-        long maxLog = -1;
-        long maxSnapShot = 0;
-        for (File f : list) {
-            String name = f.getName();
-            if (name.startsWith("log.")) {
-                long zxid = ZooKeeperServer.getZxidFromName(f.getName(), "log");
-                if (zxid > maxLog) {
-                    maxLog = zxid;
-                }
-            } else if (name.startsWith("snapshot.")) {
-                long zxid = ZooKeeperServer.getZxidFromName(f.getName(),
-                        "snapshot");
-                if (zxid > maxLog) {
-                    maxSnapShot = zxid;
-                }
-            }
-        }
-        if (maxSnapShot > maxLog) {
-            return maxSnapShot;
-        }
-        long zxid = maxLog;
-        FileInputStream logStream = null;
-        try {
-            logStream = new FileInputStream(new File(dataLogDir, "log."
-                    + Long.toHexString(maxLog)));
-            BinaryInputArchive ia = BinaryInputArchive.getArchive(logStream);
-            while (true) {
-                byte[] bytes = ia.readBuffer("txnEntry");
-                if (bytes.length == 0) {
-                    // Since we preallocate, we define EOF to be an
-                    // empty transaction
-                    break;
-                }
-                int B = ia.readByte("EOR");
-                if (B != 'B') {
-                    break;
-                }
-                InputArchive bia = BinaryInputArchive
-                        .getArchive(new ByteArrayInputStream(bytes));
-                TxnHeader hdr = new TxnHeader();
-                hdr.deserialize(bia, "hdr");
-                zxid = hdr.getZxid();
-            }
-        } catch (IOException e) {
-            LOG.warn("Unexpected exception", e);
-        } finally {
-            try {
-                if (logStream != null) {
-                    logStream.close();
-                }
-            } catch (IOException e) {
-                LOG.warn("Unexpected exception",e);
-            }
-        }
-        return zxid;
-    }
-
     public String[] getQuorumPeers() {
         List<String> l = new ArrayList<String>();
         synchronized (this) {
@@ -538,6 +483,20 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         return QuorumStats.Provider.UNKNOWN_STATE;
     }
 
+
+    public NIOServerCnxn.Factory getCnxnFactory() {
+        return cnxnFactory;
+    }
+
+    public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
+        this.cnxnFactory = cnxnFactory;
+    }
+
+    public void setQuorumPeers(ArrayList<QuorumServer> quorumPeers) {
+        this.quorumPeers = quorumPeers;
+    }
+
+
     /**
      * get the id of this quorum peer.
      */
@@ -596,34 +555,6 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         this.syncLimit = syncLimit;
     }
 
-    /**
-     * Get the directory where the snapshot is stored.
-     */
-    public File getDataDir() {
-        return dataDir;
-    }
-
-    /**
-     * Set the directory where the snapshot is stored.
-     */
-    public void setDataDir(File dataDir) {
-        this.dataDir = dataDir;
-    }
-
-    /**
-     * Get the directory where the logs are stored.
-     */
-    public File getDataLogDir() {
-        return dataLogDir;
-    }
-
-    /**
-     * Set the directory where the logs are stored.
-     */
-    public void setDataLogDir(File dataLogDir) {
-        this.dataLogDir = dataLogDir;
-    }
-
     /**
      * Gets the election port
      */
@@ -651,25 +582,19 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     public void setElectionPort(int electionPort) {
         this.electionPort = electionPort;
     }
-
-    public NIOServerCnxn.Factory getCnxnFactory() {
-        return cnxnFactory;
-    }
-
-    public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
-        this.cnxnFactory = cnxnFactory;
-    }
-
-    public void setQuorumPeers(ArrayList<QuorumServer> quorumPeers) {
-        this.quorumPeers = quorumPeers;
-    }
-
+    
     public int getClientPort() {
-        return clientPort;
+        return -1;
     }
 
     public void setClientPort(int clientPort) {
-        this.clientPort = clientPort;
     }
-
+ 
+    public void setTxnFactory(FileTxnSnapLog factory) {
+        this.logFactory = factory;
+    }
+    
+    public FileTxnSnapLog getTxnFactory() {
+        return this.logFactory;
+    }
 }

+ 5 - 6
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java

@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
 /**
  * 
@@ -69,13 +70,12 @@ public class QuorumPeerMain {
         QuorumPeerConfig.parse(args);
         if (!QuorumPeerConfig.isStandalone()) {
             runPeer(new QuorumPeer.Factory() {
-                public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory)
-                        throws IOException {
-                    
+                public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) throws IOException {
                     QuorumPeer peer = new QuorumPeer();
                     peer.setClientPort(ServerConfig.getClientPort());
-                    peer.setDataDir(new File(ServerConfig.getDataDir()));
-                    peer.setDataLogDir(new File(ServerConfig.getDataLogDir()));
+                    peer.setTxnFactory(new FileTxnSnapLog(
+                                new File(QuorumPeerConfig.getDataLogDir()), 
+                                new File(QuorumPeerConfig.getDataDir())));
                     peer.setQuorumPeers(QuorumPeerConfig.getServers());
                     peer.setElectionPort(QuorumPeerConfig.getElectionPort());
                     peer.setElectionType(QuorumPeerConfig.getElectionAlg());
@@ -85,7 +85,6 @@ public class QuorumPeerMain {
                     peer.setSyncLimit(QuorumPeerConfig.getSyncLimit());
                     peer.setCnxnFactory(cnxnFactory);
                     return peer;
-                    
                 }
                 public NIOServerCnxn.Factory createConnectionFactory() throws IOException {
                     return new NIOServerCnxn.Factory(getClientPort());

+ 105 - 0
src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.DeleteTxn;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.SetACLTxn;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+
+public class SerializeUtils {
+    private static final Logger LOG = Logger.getLogger(SerializeUtils.class);
+    
+    public static Record deserializeTxn(InputArchive ia, TxnHeader hdr)
+            throws IOException {
+        hdr.deserialize(ia, "hdr");
+        Record txn = null;
+        switch (hdr.getType()) {
+        case OpCode.createSession:
+            // This isn't really an error txn; it just has the same
+            // format. The error represents the timeout
+            txn = new CreateSessionTxn();
+            break;
+        case OpCode.closeSession:
+            return null;
+        case OpCode.create:
+            txn = new CreateTxn();
+            break;
+        case OpCode.delete:
+            txn = new DeleteTxn();
+            break;
+        case OpCode.setData:
+            txn = new SetDataTxn();
+            break;
+        case OpCode.setACL:
+            txn = new SetACLTxn();
+            break;
+        case OpCode.error:
+            txn = new ErrorTxn();
+            break;
+        }
+        if (txn != null) {
+            txn.deserialize(ia, "txn");
+        }
+        return txn;
+    }
+
+    public static void deserializeSnapshot(DataTree dt,InputArchive ia,
+            Map<Long, Integer> sessions) throws IOException {
+        int count = ia.readInt("count");
+        while (count > 0) {
+            long id = ia.readLong("id");
+            int to = ia.readInt("timeout");
+            sessions.put(id, to);
+            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
+                                     "loadData --- session in archive: " + id
+                                     + " with timeout: " + to);
+            count--;
+        }
+        dt.deserialize(ia, "tree");
+    }
+
+    public static void serializeSnapshot(DataTree dt,OutputArchive oa,
+            Map<Long, Integer> sessions) throws IOException {
+        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
+        oa.writeInt(sessSnap.size(), "count");
+        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
+            oa.writeLong(entry.getKey().longValue(), "id");
+            oa.writeInt(entry.getValue().intValue(), "timeout");
+        }
+        dt.serialize(oa, "tree");
+    }
+
+}

+ 56 - 0
src/java/test/org/apache/zookeeper/server/DataTreeUnitTest.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import junit.framework.TestCase;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.server.DataTree;
+
+public class DataTreeUnitTest extends TestCase {
+    DataTree dt;
+    
+    public void setUp() throws Exception {
+        dt=new DataTree();
+    }
+
+    public void tearDown() throws Exception {
+        dt=null;
+    }
+
+   
+    public void testRootWatchTriggered() throws Exception {
+        class MyWatcher implements Watcher{
+            boolean fired=false;
+            public void process(WatchedEvent event) {
+                if(event.getPath().equals("/"))
+                    fired=true;
+            }
+        };
+        MyWatcher watcher=new MyWatcher();
+        // set a watch on the root node
+        dt.getChildren("/", new Stat(), watcher);
+        // add a new node, should trigger a watch
+        dt.createNode("/xyz", new byte[0], null, 0, 1, 1);
+        assertFalse("Root node watch not triggered",!watcher.fired);
+    }
+
+}

+ 6 - 3
src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java

@@ -21,6 +21,9 @@ package org.apache.zookeeper.server;
 import java.io.File;
 import java.util.List;
 
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.Util;
+
 import junit.framework.TestCase;
 
 public class ZooKeeperServerTest extends TestCase {
@@ -35,7 +38,7 @@ public class ZooKeeperServerTest extends TestCase {
 
         File[] orig = files.clone();
 
-        List<File> filelist = ZooKeeperServer.sortDataDir(files, "foo", true);
+        List<File> filelist = Util.sortDataDir(files, "foo", true);
 
         assertEquals(orig[2], filelist.get(0));
         assertEquals(orig[3], filelist.get(1));
@@ -55,7 +58,7 @@ public class ZooKeeperServerTest extends TestCase {
 
         File[] orig = files.clone();
 
-        List<File> filelist = ZooKeeperServer.sortDataDir(files, "foo", false);
+        List<File> filelist = Util.sortDataDir(files, "foo", false);
 
         assertEquals(orig[4], filelist.get(0));
         assertEquals(orig[1], filelist.get(1));
@@ -76,7 +79,7 @@ public class ZooKeeperServerTest extends TestCase {
         File[] orig = files.clone();
 
         File[] filelist =
-            ZooKeeperServer.getLogFiles(files,
+                FileTxnLog.getLogFiles(files,
                 Long.parseLong("10027c6de", 16));
 
         assertEquals(3, filelist.length);

+ 2 - 2
src/java/test/org/apache/zookeeper/test/ClientBase.java

@@ -38,8 +38,8 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ServerStats;
-import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
 
 public abstract class ClientBase extends TestCase {
     protected static final Logger LOG = Logger.getLogger(ClientBase.class);
@@ -247,7 +247,7 @@ public abstract class ClientBase extends TestCase {
         // resulting in test failure (client timeout on first session).
         // set env and directly in order to handle static init/gc issues
         System.setProperty("zookeeper.preAllocSize", "100");
-        SyncRequestProcessor.setPreAllocSize(100);
+        FileTxnLog.setPreallocSize(100);
     }
     
     @Override

+ 1 - 2
src/java/test/org/apache/zookeeper/test/RecoveryTest.java

@@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
-
+import java.io.IOException;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -110,7 +110,6 @@ public class RecoveryTest extends TestCase implements Watcher {
             }
 
             f.shutdown();
-
             assertTrue("waiting for server down",
                        ClientBase.waitForServerDown(HOSTPORT,
                                           CONNECTION_TIMEOUT));

+ 8 - 0
src/zookeeper.jute

@@ -161,6 +161,14 @@ module org.apache.zookeeper.server.quorum {
     }
 }
 
+module org.apache.zookeeper.server.persistence {
+    class FileHeader {
+        int magic;
+        int version;
+        long dbid;
+    }
+}
+
 module org.apache.zookeeper.txn {
     class TxnHeader {
         long clientId;