瀏覽代碼

ZOOKEEPER-2574: PurgeTxnLog can inadvertently delete required txn log files

… files

This fix includes patch from Ed Rowe for ZOOKEEPER-2420, which is the same
issue as ZOOKEEPER-2574.

Author: Abhishek Rai <abhishek@thoughtspot.com>

Reviewers: Rakesh Radhakrishnan <rakeshr@apache.org>, Arshad Mohammad <mohammad.arshad@huawei.com>, Abraham Fine <abe@abrahamfine.com>, Allan Lyu <lvfangmin@gmail.com>, Michael Han <hanm@apache.org>

Closes #111 from abhishekrai/ZOOKEEPER-2574
Abhishek Rai 8 年之前
父節點
當前提交
762f4af65b

+ 22 - 9
docs/zookeeperAdmin.html

@@ -940,10 +940,14 @@ server.3=zoo3:2888:3888</pre>
           of the znodes stored by a particular serving ensemble. These
           of the znodes stored by a particular serving ensemble. These
           are the snapshot and transactional log files. As changes are
           are the snapshot and transactional log files. As changes are
           made to the znodes these changes are appended to a
           made to the znodes these changes are appended to a
-          transaction log, occasionally, when a log grows large, a
+          transaction log. Occasionally, when a log grows large, a
           snapshot of the current state of all znodes will be written
           snapshot of the current state of all znodes will be written
-          to the filesystem. This snapshot supercedes all previous
-          logs.
+          to the filesystem and a new transaction log file is created
+          for future transactions. During snapshotting, ZooKeeper may
+          continue appending incoming transactions to the old log file.
+          Therefore, some transactions which are newer than a snapshot
+          may be found in the last transaction log preceding the
+          snapshot.
         </p>
         </p>
 <p>A ZooKeeper server <strong>will not remove
 <p>A ZooKeeper server <strong>will not remove
         old snapshots and log files</strong> when using the default
         old snapshots and log files</strong> when using the default
@@ -1201,8 +1205,10 @@ server.3=zoo3:2888:3888</pre>
 <p>(Java system property: <strong>zookeeper.snapCount</strong>)</p>
 <p>(Java system property: <strong>zookeeper.snapCount</strong>)</p>
 <p>ZooKeeper logs transactions to a transaction
 <p>ZooKeeper logs transactions to a transaction
               log. After snapCount transactions are written to a log
               log. After snapCount transactions are written to a log
-              file a snapshot is started and a new transaction log
-              file is created. The default snapCount is
+              file a snapshot is started. It also influences rollover
+              of the current transaction log to a new file. However,
+              the creation of a new snapshot and rollover of transaction
+              log proceed independently. The default snapCount is
               100,000.</p>
               100,000.</p>
 </dd>
 </dd>
 
 
@@ -2344,8 +2350,11 @@ server.3=zoo3:2888:3888</pre>
 <p>The Log Directory contains the ZooKeeper transaction logs.
 <p>The Log Directory contains the ZooKeeper transaction logs.
         Before any update takes place, ZooKeeper ensures that the transaction
         Before any update takes place, ZooKeeper ensures that the transaction
         that represents the update is written to non-volatile storage. A new
         that represents the update is written to non-volatile storage. A new
-        log file is started each time a snapshot is begun. The log file's
-        suffix is the first zxid written to that log.</p>
+        log file is started when the number of transactions written to the
+		current log file reaches a (variable) threshold. The threshold is
+		computed using the same parameter which influences the frequency of
+		snapshotting (see snapCount above). The log file's suffix is the first
+		zxid written to that log.</p>
 <a name="sc_filemanagement"></a>
 <a name="sc_filemanagement"></a>
 <h4>File Management</h4>
 <h4>File Management</h4>
 <p>The format of snapshot and log files does not change between
 <p>The format of snapshot and log files does not change between
@@ -2360,8 +2369,12 @@ server.3=zoo3:2888:3888</pre>
 <p>The ZooKeeper server creates snapshot and log files, but
 <p>The ZooKeeper server creates snapshot and log files, but
         never deletes them. The retention policy of the data and log
         never deletes them. The retention policy of the data and log
         files is implemented outside of the ZooKeeper server. The
         files is implemented outside of the ZooKeeper server. The
-        server itself only needs the latest complete fuzzy snapshot
-        and the log files from the start of that snapshot. See the
+        server itself only needs the latest complete fuzzy snapshot, all log
+		files following it, and the last log file preceding it.  The latter
+		requirement is necessary to include updates which happened after this
+		snapshot was started but went into the existing log file at that time.
+		This is possible because snapshotting and rolling over of logs
+		proceed somewhat independently in Zookeeper. See the
         <a href="#sc_maintenance">maintenance</a> section in
         <a href="#sc_maintenance">maintenance</a> section in
         this document for more details on setting a retention policy
         this document for more details on setting a retention policy
         and maintenance of ZooKeeper storage.
         and maintenance of ZooKeeper storage.

+ 43 - 8
src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java

@@ -24,10 +24,14 @@ import java.io.IOException;
 import java.text.DateFormat;
 import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
+import java.util.Set;
 
 
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.persistence.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
 /**
  * this class is used to clean up the 
  * this class is used to clean up the 
@@ -38,6 +42,7 @@ import org.apache.zookeeper.server.persistence.Util;
  * and the corresponding logs.
  * and the corresponding logs.
  */
  */
 public class PurgeTxnLog {
 public class PurgeTxnLog {
+    private static final Logger LOG = LoggerFactory.getLogger(PurgeTxnLog.class);
 
 
     private static final String COUNT_ERR_MSG = "count should be greater than or equal to 3";
     private static final String COUNT_ERR_MSG = "count should be greater than or equal to 3";
 
 
@@ -72,18 +77,43 @@ public class PurgeTxnLog {
         FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
         FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
 
 
         List<File> snaps = txnLog.findNRecentSnapshots(num);
         List<File> snaps = txnLog.findNRecentSnapshots(num);
-        retainNRecentSnapshots(txnLog, snaps);
+        int numSnaps = snaps.size();
+        if (numSnaps > 0) {
+            purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
+        }
     }
     }
 
 
     // VisibleForTesting
     // VisibleForTesting
-    static void retainNRecentSnapshots(FileTxnSnapLog txnLog, List<File> snaps) {
-        // found any valid recent snapshots?
-        if (snaps.size() == 0)
-            return;
-        File snapShot = snaps.get(snaps.size() -1);
+    static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {
         final long leastZxidToBeRetain = Util.getZxidFromName(
         final long leastZxidToBeRetain = Util.getZxidFromName(
                 snapShot.getName(), PREFIX_SNAPSHOT);
                 snapShot.getName(), PREFIX_SNAPSHOT);
 
 
+        /**
+         * We delete all files with a zxid in their name that is less than leastZxidToBeRetain.
+         * This rule applies to both snapshot files as well as log files, with the following
+         * exception for log files.
+         *
+         * A log file with zxid less than X may contain transactions with zxid larger than X.  More
+         * precisely, a log file named log.(X-a) may contain transactions newer than snapshot.X if
+         * there are no other log files with starting zxid in the interval (X-a, X].  Assuming the
+         * latter condition is true, log.(X-a) must be retained to ensure that snapshot.X is
+         * recoverable.  In fact, this log file may very well extend beyond snapshot.X to newer
+         * snapshot files if these newer snapshots were not accompanied by log rollover (possible in
+         * the learner state machine at the time of this writing).  We can make more precise
+         * determination of whether log.(leastZxidToBeRetain-a) for the smallest 'a' is actually
+         * needed or not (e.g. not needed if there's a log file named log.(leastZxidToBeRetain+1)),
+         * but the complexity quickly adds up with gains only in uncommon scenarios.  It's safe and
+         * simple to just preserve log.(leastZxidToBeRetain-a) for the smallest 'a' to ensure
+         * recoverability of all snapshots being retained.  We determine that log file here by
+         * calling txnLog.getSnapshotLogs().
+         */
+        final Set<File> retainedTxnLogs = new HashSet<File>();
+        retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain)));
+
+        /**
+         * Finds all candidates for deletion, which are files with a zxid in their name that is less
+         * than leastZxidToBeRetain.  There's an exception to this rule, as noted above.
+         */
         class MyFileFilter implements FileFilter{
         class MyFileFilter implements FileFilter{
             private final String prefix;
             private final String prefix;
             MyFileFilter(String prefix){
             MyFileFilter(String prefix){
@@ -92,6 +122,9 @@ public class PurgeTxnLog {
             public boolean accept(File f){
             public boolean accept(File f){
                 if(!f.getName().startsWith(prefix + "."))
                 if(!f.getName().startsWith(prefix + "."))
                     return false;
                     return false;
+                if (retainedTxnLogs.contains(f)) {
+                    return false;
+                }
                 long fZxid = Util.getZxidFromName(f.getName(), prefix);
                 long fZxid = Util.getZxidFromName(f.getName(), prefix);
                 if (fZxid >= leastZxidToBeRetain) {
                 if (fZxid >= leastZxidToBeRetain) {
                     return false;
                     return false;
@@ -115,9 +148,11 @@ public class PurgeTxnLog {
         // remove the old files
         // remove the old files
         for(File f: files)
         for(File f: files)
         {
         {
-            System.out.println("Removing file: "+
+            final String msg = "Removing file: "+
                 DateFormat.getDateTimeInstance().format(f.lastModified())+
                 DateFormat.getDateTimeInstance().format(f.lastModified())+
-                "\t"+f.getPath());
+                "\t"+f.getPath();
+            LOG.info(msg);
+            System.out.println(msg);
             if(!f.delete()){
             if(!f.delete()){
                 System.err.println("Failed to remove "+f.getPath());
                 System.err.println("Failed to remove "+f.getPath());
             }
             }

+ 7 - 5
src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java

@@ -177,19 +177,21 @@ public class FileSnap implements SnapShot {
     /**
     /**
      * find the last n snapshots. this does not have
      * find the last n snapshots. this does not have
      * any checks if the snapshot might be valid or not
      * any checks if the snapshot might be valid or not
-     * @param the number of most recent snapshots 
+     * @param the number of most recent snapshots
      * @return the last n snapshots
      * @return the last n snapshots
      * @throws IOException
      * @throws IOException
      */
      */
     public List<File> findNRecentSnapshots(int n) throws IOException {
     public List<File> findNRecentSnapshots(int n) throws IOException {
         List<File> files = Util.sortDataDir(snapDir.listFiles(), "snapshot", false);
         List<File> files = Util.sortDataDir(snapDir.listFiles(), "snapshot", false);
-        int i = 0;
+        int count = 0;
         List<File> list = new ArrayList<File>();
         List<File> list = new ArrayList<File>();
         for (File f: files) {
         for (File f: files) {
-            if (i==n)
+            if (count == n)
                 break;
                 break;
-            i++;
-            list.add(f);
+            if (Util.getZxidFromName(f.getName(), "snapshot") != -1) {
+                count++;
+                list.add(f);
+            }
         }
         }
         return list;
         return list;
     }
     }

+ 4 - 2
src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -400,8 +400,10 @@ public class FileTxnSnapLog {
     }
     }
 
 
     /**
     /**
-     * get the snapshot logs that are greater than
-     * the given zxid
+     * get the snapshot logs which may contain transactions newer than the given zxid.
+     * This includes logs with starting zxid greater than given zxid, as well as the
+     * newest transaction log with starting zxid less than given zxid.  The latter log
+     * file may contain transactions beyond given zxid.
      * @param zxid the zxid that contains logs greater than
      * @param zxid the zxid that contains logs greater than
      * zxid
      * zxid
      * @return
      * @return

+ 168 - 24
src/java/test/org/apache/zookeeper/server/PurgeTxnTest.java

@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.data.Stat;
 
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.PortAssignment;
@@ -40,6 +41,7 @@ import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assert;
@@ -172,10 +174,17 @@ public class PurgeTxnTest extends ZKTestCase {
         int nRecentSnap = 4; // n recent snap shots
         int nRecentSnap = 4; // n recent snap shots
         int nRecentCount = 30;
         int nRecentCount = 30;
         int offset = 0;
         int offset = 0;
+
         tmpDir = ClientBase.createTmpDir();
         tmpDir = ClientBase.createTmpDir();
         File version2 = new File(tmpDir.toString(), "version-2");
         File version2 = new File(tmpDir.toString(), "version-2");
         Assert.assertTrue("Failed to create version_2 dir:" + version2.toString(),
         Assert.assertTrue("Failed to create version_2 dir:" + version2.toString(),
                 version2.mkdir());
                 version2.mkdir());
+
+        // Test that with no snaps, findNRecentSnapshots returns empty list
+        FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
+        List<File> foundSnaps = txnLog.findNRecentSnapshots(1);
+        assertEquals(0, foundSnaps.size());
+
         List<File> expectedNRecentSnapFiles = new ArrayList<File>();
         List<File> expectedNRecentSnapFiles = new ArrayList<File>();
         int counter = offset + (2 * nRecentCount);
         int counter = offset + (2 * nRecentCount);
         for (int i = 0; i < nRecentCount; i++) {
         for (int i = 0; i < nRecentCount; i++) {
@@ -194,14 +203,25 @@ public class PurgeTxnTest extends ZKTestCase {
             }
             }
         }
         }
 
 
-        FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
+        // Test that when we ask for recent snaps we get the number we asked for and
+        // the files we expected
         List<File> nRecentSnapFiles = txnLog.findNRecentSnapshots(nRecentSnap);
         List<File> nRecentSnapFiles = txnLog.findNRecentSnapshots(nRecentSnap);
-        txnLog.close();
         Assert.assertEquals("exactly 4 snapshots ", 4,
         Assert.assertEquals("exactly 4 snapshots ", 4,
                 nRecentSnapFiles.size());
                 nRecentSnapFiles.size());
         expectedNRecentSnapFiles.removeAll(nRecentSnapFiles);
         expectedNRecentSnapFiles.removeAll(nRecentSnapFiles);
         Assert.assertEquals("Didn't get the recent snap files", 0,
         Assert.assertEquals("Didn't get the recent snap files", 0,
                 expectedNRecentSnapFiles.size());
                 expectedNRecentSnapFiles.size());
+
+        // Test that when asking for more snaps than we created, we still only get snaps
+        // not logs or anything else (per ZOOKEEPER-2420)
+        nRecentSnapFiles = txnLog.findNRecentSnapshots(nRecentCount + 5);
+        assertEquals(nRecentCount, nRecentSnapFiles.size());
+        for (File f: nRecentSnapFiles) {
+            Assert.assertTrue("findNRecentSnapshots() returned a non-snapshot: " + f.getPath(),
+                   (Util.getZxidFromName(f.getName(), "snapshot") != -1));
+        }
+
+        txnLog.close();
     }
     }
 
 
     /**
     /**
@@ -225,14 +245,21 @@ public class PurgeTxnTest extends ZKTestCase {
         List<File> logs = new ArrayList<File>();
         List<File> logs = new ArrayList<File>();
         List<File> snapsAboveRecentFiles = new ArrayList<File>();
         List<File> snapsAboveRecentFiles = new ArrayList<File>();
         List<File> logsAboveRecentFiles = new ArrayList<File>();
         List<File> logsAboveRecentFiles = new ArrayList<File>();
-        createDataDirFiles(offset, fileToPurgeCount, version2, snapsToPurge,
+        createDataDirFiles(offset, fileToPurgeCount, false, version2, snapsToPurge,
                 logsToPurge);
                 logsToPurge);
-        createDataDirFiles(offset, nRecentCount, version2, snaps, logs);
-        createDataDirFiles(offset, fileAboveRecentCount, version2,
+        createDataDirFiles(offset, nRecentCount, false, version2, snaps, logs);
+        logs.add(logsToPurge.remove(0)); // log that precedes first retained snapshot is also retained
+        createDataDirFiles(offset, fileAboveRecentCount, false, version2,
                 snapsAboveRecentFiles, logsAboveRecentFiles);
                 snapsAboveRecentFiles, logsAboveRecentFiles);
 
 
+        /**
+         * The newest log file preceding the oldest retained snapshot is not removed as it may
+         * contain transactions newer than the oldest snapshot.
+         */
+        logsToPurge.remove(0);
+
         FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
         FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
-        PurgeTxnLog.retainNRecentSnapshots(txnLog, snaps);
+        PurgeTxnLog.purgeOlderSnapshots(txnLog, snaps.get(snaps.size() - 1));
         txnLog.close();
         txnLog.close();
         verifyFilesAfterPurge(snapsToPurge, false);
         verifyFilesAfterPurge(snapsToPurge, false);
         verifyFilesAfterPurge(logsToPurge, false);
         verifyFilesAfterPurge(logsToPurge, false);
@@ -243,11 +270,24 @@ public class PurgeTxnTest extends ZKTestCase {
     }
     }
 
 
     /**
     /**
-     * Tests purge where the data directory contains snap files equals to the
+     * Tests purge where the data directory contains snap files and log files equals to the
      * number of files to be retained
      * number of files to be retained
      */
      */
     @Test
     @Test
     public void testSnapFilesEqualsToRetain() throws Exception {
     public void testSnapFilesEqualsToRetain() throws Exception {
+        internalTestSnapFilesEqualsToRetain(false);
+    }
+
+    /**
+     * Tests purge where the data directory contains snap files equals to the
+     * number of files to be retained, and a log file that precedes the earliest snapshot
+     */
+    @Test
+    public void testSnapFilesEqualsToRetainWithPrecedingLog() throws Exception {
+        internalTestSnapFilesEqualsToRetain(true);
+    }
+
+    public void internalTestSnapFilesEqualsToRetain(boolean testWithPrecedingLogFile) throws Exception {
         int nRecentCount = 3;
         int nRecentCount = 3;
         AtomicInteger offset = new AtomicInteger(0);
         AtomicInteger offset = new AtomicInteger(0);
         tmpDir = ClientBase.createTmpDir();
         tmpDir = ClientBase.createTmpDir();
@@ -256,10 +296,10 @@ public class PurgeTxnTest extends ZKTestCase {
                 version2.mkdir());
                 version2.mkdir());
         List<File> snaps = new ArrayList<File>();
         List<File> snaps = new ArrayList<File>();
         List<File> logs = new ArrayList<File>();
         List<File> logs = new ArrayList<File>();
-        createDataDirFiles(offset, nRecentCount, version2, snaps, logs);
+        createDataDirFiles(offset, nRecentCount, testWithPrecedingLogFile, version2, snaps, logs);
 
 
         FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
         FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
-        PurgeTxnLog.retainNRecentSnapshots(txnLog, snaps);
+        PurgeTxnLog.purgeOlderSnapshots(txnLog, snaps.get(snaps.size() - 1));
         txnLog.close();
         txnLog.close();
         verifyFilesAfterPurge(snaps, true);
         verifyFilesAfterPurge(snaps, true);
         verifyFilesAfterPurge(logs, true);
         verifyFilesAfterPurge(logs, true);
@@ -282,12 +322,19 @@ public class PurgeTxnTest extends ZKTestCase {
         List<File> logsToPurge = new ArrayList<File>();
         List<File> logsToPurge = new ArrayList<File>();
         List<File> snaps = new ArrayList<File>();
         List<File> snaps = new ArrayList<File>();
         List<File> logs = new ArrayList<File>();
         List<File> logs = new ArrayList<File>();
-        createDataDirFiles(offset, fileToPurgeCount, version2, snapsToPurge,
+        createDataDirFiles(offset, fileToPurgeCount, false, version2, snapsToPurge,
                 logsToPurge);
                 logsToPurge);
-        createDataDirFiles(offset, nRecentCount, version2, snaps, logs);
+        createDataDirFiles(offset, nRecentCount, false, version2, snaps, logs);
+        logs.add(logsToPurge.remove(0)); // log that precedes first retained snapshot is also retained
+
+        /**
+         * The newest log file preceding the oldest retained snapshot is not removed as it may
+         * contain transactions newer than the oldest snapshot.
+         */
+        logsToPurge.remove(0);
 
 
         FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
         FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
-        PurgeTxnLog.retainNRecentSnapshots(txnLog, snaps);
+        PurgeTxnLog.purgeOlderSnapshots(txnLog, snaps.get(snaps.size() - 1));
         txnLog.close();
         txnLog.close();
         verifyFilesAfterPurge(snapsToPurge, false);
         verifyFilesAfterPurge(snapsToPurge, false);
         verifyFilesAfterPurge(logsToPurge, false);
         verifyFilesAfterPurge(logsToPurge, false);
@@ -327,15 +374,16 @@ public class PurgeTxnTest extends ZKTestCase {
             snapFile.createNewFile();
             snapFile.createNewFile();
         }
         }
 
 
-        int numberOfFilesToKeep = 10;
+        int numberOfSnapFilesToKeep = 10;
         // scenario where four parameter are passed
         // scenario where four parameter are passed
         String[] args = new String[] { dataLogDir.getAbsolutePath(),
         String[] args = new String[] { dataLogDir.getAbsolutePath(),
                 dataDir.getAbsolutePath(), "-n",
                 dataDir.getAbsolutePath(), "-n",
-                Integer.toString(numberOfFilesToKeep) };
+                Integer.toString(numberOfSnapFilesToKeep) };
         PurgeTxnLog.main(args);
         PurgeTxnLog.main(args);
 
 
-        assertEquals(numberOfFilesToKeep, dataDirVersion2.listFiles().length);
-        assertEquals(numberOfFilesToKeep, dataLogDirVersion2.listFiles().length);
+        assertEquals(numberOfSnapFilesToKeep, dataDirVersion2.listFiles().length);
+        // Since for each snapshot we have a log file with same zxid, expect same # logs as snaps to be kept
+        assertEquals(numberOfSnapFilesToKeep, dataLogDirVersion2.listFiles().length);
         ClientBase.recursiveDelete(tmpDir);
         ClientBase.recursiveDelete(tmpDir);
 
 
     }
     }
@@ -371,28 +419,121 @@ public class PurgeTxnTest extends ZKTestCase {
             snapFile.createNewFile();
             snapFile.createNewFile();
         }
         }
 
 
-        int numberOfFilesToKeep = 10;
+        int numberOfSnapFilesToKeep = 10;
         // scenario where only three parameter are passed
         // scenario where only three parameter are passed
         String[] args = new String[] { dataLogDir.getAbsolutePath(), "-n",
         String[] args = new String[] { dataLogDir.getAbsolutePath(), "-n",
-                Integer.toString(numberOfFilesToKeep) };
+                Integer.toString(numberOfSnapFilesToKeep) };
         PurgeTxnLog.main(args);
         PurgeTxnLog.main(args);
-        assertEquals(numberOfFilesToKeep + numberOfFilesToKeep,
+        assertEquals(numberOfSnapFilesToKeep * 2, // Since for each snapshot we have a log file with same zxid, expect same # logs as snaps to be kept
                 dataLogDirVersion2.listFiles().length);
                 dataLogDirVersion2.listFiles().length);
         ClientBase.recursiveDelete(tmpDir);
         ClientBase.recursiveDelete(tmpDir);
 
 
     }
     }
 
 
-    private void createDataDirFiles(AtomicInteger offset, int limit,
+    /**
+     * Verifies that purge does not delete any log files which started before the oldest retained
+     * snapshot but which might extend beyond it.
+     * @throws Exception an exception might be thrown here
+     */
+    @Test
+    public void testPurgeDoesNotDeleteOverlappingLogFile() throws Exception {
+        // Setting used for snapRetainCount in this test.
+        final int SNAP_RETAIN_COUNT = 3;
+        // Number of znodes this test creates in each snapshot.
+        final int NUM_ZNODES_PER_SNAPSHOT = 100;
+        /**
+         * Set a sufficiently high snapCount to ensure that we don't rollover the log.  Normally,
+         * the default value (100K at time of this writing) would ensure this, but we make that
+         * dependence explicit here to make the test future-proof.  Not rolling over the log is
+         * important for this test since we are testing retention of the one and only log file which
+         * predates each retained snapshot.
+         */
+        SyncRequestProcessor.setSnapCount(SNAP_RETAIN_COUNT * NUM_ZNODES_PER_SNAPSHOT * 10);
+
+        // Create Zookeeper and connect to it.
+        tmpDir = ClientBase.createTmpDir();
+        ClientBase.setupTestEnv();
+        ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
+        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+        ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
+        f.startup(zks);
+        Assert.assertTrue("waiting for server being up ",
+                ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
+        ZooKeeper zk = ClientBase.createZKClient(HOSTPORT);
+
+        // Unique identifier for each znode that we create.
+        int unique = 0;
+        try {
+            /**
+             * Create some znodes and take a snapshot.  Repeat this until we have SNAP_RETAIN_COUNT
+             * snapshots.  Do not rollover the log.
+             */
+            for (int snapshotCount = 0; snapshotCount < SNAP_RETAIN_COUNT; snapshotCount++) {
+                for (int i = 0; i< 100; i++, unique++) {
+                    zk.create("/snap-" + unique, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                zks.takeSnapshot();
+            }
+            // Create some additional znodes without taking a snapshot afterwards.
+            for (int i = 0; i< 100; i++, unique++) {
+                zk.create("/snap-" + unique, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+            }
+        } finally {
+            zk.close();
+        }
+
+        // Shutdown Zookeeper.
+        f.shutdown();
+        zks.getTxnLogFactory().close();
+        zks.shutdown();
+        Assert.assertTrue("waiting for server to shutdown",
+                ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
+
+        // Purge snapshot and log files.
+        PurgeTxnLog.purge(tmpDir, tmpDir, SNAP_RETAIN_COUNT);
+
+        // Initialize Zookeeper again from the same dataDir.
+        zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
+        f = ServerCnxnFactory.createFactory(PORT, -1);
+        f.startup(zks);
+        zk = ClientBase.createZKClient(HOSTPORT);
+
+        /**
+         * Verify that the last znode that was created above exists.  This znode's creation was
+         * captured by the transaction log which was created before any of the above
+         * SNAP_RETAIN_COUNT snapshots were created, but it's not captured in any of these
+         * snapshots.  So for it it exist, the (only) existing log file should not have been purged.
+         */
+        final String lastZnode = "/snap-" + (unique - 1);
+        final Stat stat = zk.exists(lastZnode, false);
+        Assert.assertNotNull("Last znode does not exist: " + lastZnode, stat);
+
+        // Shutdown for the last time.
+        f.shutdown();
+        zks.getTxnLogFactory().close();
+        zks.shutdown();
+    }
+
+    private File createDataDirLogFile(File version_2, int Zxid) throws IOException {
+        File logFile = new File(version_2 + "/log." + Long.toHexString(Zxid));
+        Assert.assertTrue("Failed to create log File:" + logFile.toString(),
+                logFile.createNewFile());
+        return logFile;
+    }
+
+    private void createDataDirFiles(AtomicInteger offset, int limit, boolean createPrecedingLogFile,
             File version_2, List<File> snaps, List<File> logs)
             File version_2, List<File> snaps, List<File> logs)
             throws IOException {
             throws IOException {
         int counter = offset.get() + (2 * limit);
         int counter = offset.get() + (2 * limit);
+        if (createPrecedingLogFile) {
+            counter++;
+        }
         offset.set(counter);
         offset.set(counter);
         for (int i = 0; i < limit; i++) {
         for (int i = 0; i < limit; i++) {
             // simulate log file
             // simulate log file
-            File logFile = new File(version_2 + "/log." + Long.toHexString(--counter));
-            Assert.assertTrue("Failed to create log File:" + logFile.toString(),
-                    logFile.createNewFile());
-            logs.add(logFile);
+            logs.add(createDataDirLogFile(version_2, --counter));
             // simulate snapshot file
             // simulate snapshot file
             File snapFile = new File(version_2 + "/snapshot."
             File snapFile = new File(version_2 + "/snapshot."
                     + Long.toHexString(--counter));
                     + Long.toHexString(--counter));
@@ -400,6 +541,9 @@ public class PurgeTxnTest extends ZKTestCase {
                     snapFile.createNewFile());
                     snapFile.createNewFile());
             snaps.add(snapFile);
             snaps.add(snapFile);
         }
         }
+        if (createPrecedingLogFile) {
+            logs.add(createDataDirLogFile(version_2, --counter));
+        }
     }
     }
 
 
     private void verifyFilesAfterPurge(List<File> logs, boolean exists) {
     private void verifyFilesAfterPurge(List<File> logs, boolean exists) {