|
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
@@ -32,16 +33,23 @@ import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.zip.CheckedOutputStream;
|
|
|
+import org.apache.jute.BinaryOutputArchive;
|
|
|
+import org.apache.jute.OutputArchive;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
import org.apache.zookeeper.ZKTestCase;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
+import org.apache.zookeeper.server.persistence.FileHeader;
|
|
|
+import org.apache.zookeeper.server.persistence.FileSnap;
|
|
|
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
|
|
|
+import org.apache.zookeeper.server.persistence.SnapStream;
|
|
|
import org.apache.zookeeper.server.persistence.Util;
|
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
|
import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -54,6 +62,11 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
private static final long OP_TIMEOUT_IN_MILLIS = 90000;
|
|
|
private File tmpDir;
|
|
|
|
|
|
+ @Before
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ tmpDir = ClientBase.createTmpDir();
|
|
|
+ }
|
|
|
+
|
|
|
@After
|
|
|
public void teardown() {
|
|
|
if (null != tmpDir) {
|
|
@@ -67,7 +80,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testPurge() throws Exception {
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
SyncRequestProcessor.setSnapCount(100);
|
|
@@ -89,7 +101,7 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
// now corrupt the snapshot
|
|
|
PurgeTxnLog.purge(tmpDir, tmpDir, 3);
|
|
|
FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpDir, tmpDir);
|
|
|
- List<File> listLogs = snaplog.findNRecentSnapshots(4);
|
|
|
+ List<File> listLogs = snaplog.findNValidSnapshots(4);
|
|
|
int numSnaps = 0;
|
|
|
for (File ff : listLogs) {
|
|
|
if (ff.getName().startsWith("snapshot")) {
|
|
@@ -111,7 +123,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testPurgeWhenLogRollingInProgress() throws Exception {
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
SyncRequestProcessor.setSnapCount(30);
|
|
@@ -158,21 +169,20 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Tests finding n recent snapshots from set of snapshots and data logs
|
|
|
+ * Tests finding n recent valid snapshots from set of snapshots and data logs
|
|
|
*/
|
|
|
@Test
|
|
|
- public void testFindNRecentSnapshots() throws Exception {
|
|
|
+ public void testFindNValidSnapshots() throws Exception {
|
|
|
int nRecentSnap = 4; // n recent snap shots
|
|
|
int nRecentCount = 30;
|
|
|
int offset = 0;
|
|
|
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
File version2 = new File(tmpDir.toString(), "version-2");
|
|
|
assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir());
|
|
|
|
|
|
- // Test that with no snaps, findNRecentSnapshots returns empty list
|
|
|
+ // Test that with no snaps, findNValidSnapshots returns empty list
|
|
|
FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir);
|
|
|
- List<File> foundSnaps = txnLog.findNRecentSnapshots(1);
|
|
|
+ List<File> foundSnaps = txnLog.findNValidSnapshots(1);
|
|
|
assertEquals(0, foundSnaps.size());
|
|
|
|
|
|
List<File> expectedNRecentSnapFiles = new ArrayList<File>();
|
|
@@ -184,6 +194,7 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
// simulate snapshot file
|
|
|
File snapFile = new File(version2 + "/snapshot." + Long.toHexString(--counter));
|
|
|
assertTrue("Failed to create snap File:" + snapFile.toString(), snapFile.createNewFile());
|
|
|
+ makeValidSnapshot(snapFile);
|
|
|
// add the n recent snap files for assertion
|
|
|
if (i < nRecentSnap) {
|
|
|
expectedNRecentSnapFiles.add(snapFile);
|
|
@@ -192,17 +203,17 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
|
|
|
// Test that when we ask for recent snaps we get the number we asked for and
|
|
|
// the files we expected
|
|
|
- List<File> nRecentSnapFiles = txnLog.findNRecentSnapshots(nRecentSnap);
|
|
|
- assertEquals("exactly 4 snapshots ", 4, nRecentSnapFiles.size());
|
|
|
- expectedNRecentSnapFiles.removeAll(nRecentSnapFiles);
|
|
|
+ List<File> nRecentValidSnapFiles = txnLog.findNValidSnapshots(nRecentSnap);
|
|
|
+ assertEquals("exactly 4 snapshots ", 4, nRecentValidSnapFiles.size());
|
|
|
+ expectedNRecentSnapFiles.removeAll(nRecentValidSnapFiles);
|
|
|
assertEquals("Didn't get the recent snap files", 0, expectedNRecentSnapFiles.size());
|
|
|
|
|
|
// Test that when asking for more snaps than we created, we still only get snaps
|
|
|
// not logs or anything else (per ZOOKEEPER-2420)
|
|
|
- nRecentSnapFiles = txnLog.findNRecentSnapshots(nRecentCount + 5);
|
|
|
- assertEquals(nRecentCount, nRecentSnapFiles.size());
|
|
|
- for (File f : nRecentSnapFiles) {
|
|
|
- assertTrue("findNRecentSnapshots() returned a non-snapshot: "
|
|
|
+ nRecentValidSnapFiles = txnLog.findNValidSnapshots(nRecentCount + 5);
|
|
|
+ assertEquals(nRecentCount, nRecentValidSnapFiles.size());
|
|
|
+ for (File f : nRecentValidSnapFiles) {
|
|
|
+ assertTrue("findNValidSnapshots() returned a non-snapshot: "
|
|
|
+ f.getPath(), (Util.getZxidFromName(f.getName(), "snapshot") != -1));
|
|
|
}
|
|
|
|
|
@@ -220,7 +231,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
int fileAboveRecentCount = 4;
|
|
|
int fileToPurgeCount = 2;
|
|
|
AtomicInteger offset = new AtomicInteger(0);
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
File version2 = new File(tmpDir.toString(), "version-2");
|
|
|
assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir());
|
|
|
List<File> snapsToPurge = new ArrayList<File>();
|
|
@@ -272,7 +282,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
public void internalTestSnapFilesEqualsToRetain(boolean testWithPrecedingLogFile) throws Exception {
|
|
|
int nRecentCount = 3;
|
|
|
AtomicInteger offset = new AtomicInteger(0);
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
File version2 = new File(tmpDir.toString(), "version-2");
|
|
|
assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir());
|
|
|
List<File> snaps = new ArrayList<File>();
|
|
@@ -295,7 +304,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
int nRecentCount = 4;
|
|
|
int fileToPurgeCount = 2;
|
|
|
AtomicInteger offset = new AtomicInteger(0);
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
File version2 = new File(tmpDir.toString(), "version-2");
|
|
|
assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir());
|
|
|
List<File> snapsToPurge = new ArrayList<File>();
|
|
@@ -327,7 +335,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testPurgeTxnLogWithDataDir() throws Exception {
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
File dataDir = new File(tmpDir, "dataDir");
|
|
|
File dataLogDir = new File(tmpDir, "dataLogDir");
|
|
|
|
|
@@ -348,6 +355,7 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
// simulate snapshot file
|
|
|
File snapFile = new File(dataDirVersion2, "snapshot." + Long.toHexString(i));
|
|
|
snapFile.createNewFile();
|
|
|
+ makeValidSnapshot(snapFile);
|
|
|
}
|
|
|
|
|
|
int numberOfSnapFilesToKeep = 10;
|
|
@@ -358,8 +366,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
assertEquals(numberOfSnapFilesToKeep, dataDirVersion2.listFiles().length);
|
|
|
// Since for each snapshot we have a log file with same zxid, expect same # logs as snaps to be kept
|
|
|
assertEquals(numberOfSnapFilesToKeep, dataLogDirVersion2.listFiles().length);
|
|
|
- ClientBase.recursiveDelete(tmpDir);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -368,7 +374,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testPurgeTxnLogWithoutDataDir() throws Exception {
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
File dataDir = new File(tmpDir, "dataDir");
|
|
|
File dataLogDir = new File(tmpDir, "dataLogDir");
|
|
|
|
|
@@ -388,6 +393,7 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
// simulate snapshot file
|
|
|
File snapFile = new File(dataLogDirVersion2, "snapshot." + Long.toHexString(i));
|
|
|
snapFile.createNewFile();
|
|
|
+ makeValidSnapshot(snapFile);
|
|
|
}
|
|
|
|
|
|
int numberOfSnapFilesToKeep = 10;
|
|
@@ -398,8 +404,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
numberOfSnapFilesToKeep
|
|
|
* 2, // Since for each snapshot we have a log file with same zxid, expect same # logs as snaps to be kept
|
|
|
dataLogDirVersion2.listFiles().length);
|
|
|
- ClientBase.recursiveDelete(tmpDir);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -423,7 +427,6 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
SyncRequestProcessor.setSnapCount(SNAP_RETAIN_COUNT * NUM_ZNODES_PER_SNAPSHOT * 10);
|
|
|
|
|
|
// Create Zookeeper and connect to it.
|
|
|
- tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
@@ -484,6 +487,45 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
zks.shutdown();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testPurgeTxnLogWhenRecentSnapshotsAreAllInvalid() throws Exception {
|
|
|
+ File dataDir = new File(tmpDir, "dataDir");
|
|
|
+ File dataLogDir = new File(tmpDir, "dataLogDir");
|
|
|
+
|
|
|
+ File dataDirVersion2 = new File(dataDir, "version-2");
|
|
|
+ dataDirVersion2.mkdirs();
|
|
|
+ File dataLogDirVersion2 = new File(dataLogDir, "version-2");
|
|
|
+ dataLogDirVersion2.mkdirs();
|
|
|
+
|
|
|
+ // create dummy log and transaction file
|
|
|
+ int totalFiles = 10;
|
|
|
+ int numberOfSnapFilesToKeep = 3;
|
|
|
+
|
|
|
+ // create transaction and snapshot files in different-different
|
|
|
+ // directories
|
|
|
+ for (int i = 0; i < totalFiles; i++) {
|
|
|
+ // simulate log file
|
|
|
+ File logFile = new File(dataLogDirVersion2, "log." + Long.toHexString(i));
|
|
|
+ logFile.createNewFile();
|
|
|
+ // simulate snapshot file
|
|
|
+ File snapFile = new File(dataDirVersion2, "snapshot." + Long.toHexString(i));
|
|
|
+ snapFile.createNewFile();
|
|
|
+ if (i < (totalFiles - numberOfSnapFilesToKeep)) {
|
|
|
+ makeValidSnapshot(snapFile);
|
|
|
+ } else {
|
|
|
+ makeInvalidSnapshot(snapFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // scenario where four parameter are passed
|
|
|
+ String[] args = new String[]{dataLogDir.getAbsolutePath(), dataDir.getAbsolutePath(), "-n", Integer.toString(numberOfSnapFilesToKeep)};
|
|
|
+ PurgeTxnLog.main(args);
|
|
|
+ //Since the recent 3 snapshots are all invalid,when purging, we can assert that 6 snapshot files are retained(3 invalid snapshots and 3 retained valid snapshots)
|
|
|
+ assertEquals(numberOfSnapFilesToKeep + numberOfSnapFilesToKeep, dataDirVersion2.listFiles().length);
|
|
|
+ // Since for each snapshot we have a log file with same zxid, expect same # logs as snaps to be kept
|
|
|
+ assertEquals(numberOfSnapFilesToKeep + numberOfSnapFilesToKeep, dataLogDirVersion2.listFiles().length);
|
|
|
+ }
|
|
|
+
|
|
|
private File createDataDirLogFile(File version_2, int Zxid) throws IOException {
|
|
|
File logFile = new File(version_2 + "/log." + Long.toHexString(Zxid));
|
|
|
assertTrue("Failed to create log File:" + logFile.toString(), logFile.createNewFile());
|
|
@@ -553,4 +595,26 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
return znodes;
|
|
|
}
|
|
|
|
|
|
+ private void makeValidSnapshot(File snapFile) throws IOException {
|
|
|
+ SnapStream.setStreamMode(SnapStream.StreamMode.CHECKED);
|
|
|
+ CheckedOutputStream os = SnapStream.getOutputStream(snapFile);
|
|
|
+ OutputArchive oa = BinaryOutputArchive.getArchive(os);
|
|
|
+ FileHeader header = new FileHeader(FileSnap.SNAP_MAGIC, 2, 1);
|
|
|
+ header.serialize(oa, "fileheader");
|
|
|
+ SnapStream.sealStream(os, oa);
|
|
|
+ os.flush();
|
|
|
+ os.close();
|
|
|
+
|
|
|
+ assertTrue(SnapStream.isValidSnapshot(snapFile));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void makeInvalidSnapshot(File snapFile) throws IOException {
|
|
|
+ SnapStream.setStreamMode(SnapStream.StreamMode.CHECKED);
|
|
|
+ OutputStream os = SnapStream.getOutputStream(snapFile);
|
|
|
+ os.write(1);
|
|
|
+ os.flush();
|
|
|
+ os.close();
|
|
|
+
|
|
|
+ assertFalse(SnapStream.isValidSnapshot(snapFile));
|
|
|
+ }
|
|
|
}
|