|
@@ -43,10 +43,10 @@ import org.apache.zookeeper.server.DataTree;
|
|
|
import org.apache.zookeeper.server.DataNode;
|
|
|
import org.apache.zookeeper.txn.CreateTxn;
|
|
|
import org.apache.zookeeper.txn.DeleteTxn;
|
|
|
-import org.apache.zookeeper.txn.TxnHeader;
|
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
|
import org.apache.jute.Record;
|
|
|
import java.io.FileInputStream;
|
|
|
+
|
|
|
import org.apache.jute.BinaryInputArchive;
|
|
|
import org.apache.zookeeper.server.persistence.FileHeader;
|
|
|
|
|
@@ -59,6 +59,7 @@ public class LoadFromLogTest extends ZKTestCase implements Watcher {
|
|
|
// setting up the quorum has a transaction overhead for creating and closing the session
|
|
|
private static final int TRANSACTION_OVERHEAD = 2;
|
|
|
private static final int TOTAL_TRANSACTIONS = NUM_MESSAGES + TRANSACTION_OVERHEAD;
|
|
|
+ private volatile boolean connected;
|
|
|
|
|
|
/**
|
|
|
* test that all transactions from the Log are loaded, and only once
|
|
@@ -114,7 +115,22 @@ public class LoadFromLogTest extends ZKTestCase implements Watcher {
|
|
|
|
|
|
|
|
|
public void process(WatchedEvent event) {
|
|
|
- // do nothing
|
|
|
+ switch (event.getType()) {
|
|
|
+ case None:
|
|
|
+ switch (event.getState()) {
|
|
|
+ case SyncConnected:
|
|
|
+ connected = true;
|
|
|
+ break;
|
|
|
+ case Disconnected:
|
|
|
+ connected = false;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -215,4 +231,111 @@ public class LoadFromLogTest extends ZKTestCase implements Watcher {
|
|
|
Assert.assertTrue("Missing magic number ",
|
|
|
header.getMagic() == FileTxnLog.TXNLOG_MAGIC);
|
|
|
}
|
|
|
-}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test we can restore the snapshot that has data ahead of the zxid
|
|
|
+ * of the snapshot file.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRestore() throws Exception {
|
|
|
+ // setup a single server cluster
|
|
|
+ File tmpDir = ClientBase.createTmpDir();
|
|
|
+ ClientBase.setupTestEnv();
|
|
|
+ ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
+ SyncRequestProcessor.setSnapCount(10000);
|
|
|
+ final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
+ f.startup(zks);
|
|
|
+ Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
+ .waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
|
|
|
+
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ while (!connected) {
|
|
|
+ long end = System.currentTimeMillis();
|
|
|
+ if (end - start > 5000) {
|
|
|
+ Assert.assertTrue("Could not connect with server in 5 seconds",
|
|
|
+ false);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(200);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Intrrupted");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ // generate some transactions
|
|
|
+ String lastPath = null;
|
|
|
+ try {
|
|
|
+ zk.create("/invalidsnap", new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
+ for (int i = 0; i < NUM_MESSAGES; i++) {
|
|
|
+ lastPath = zk.create("/invalidsnap/test-", new byte[0],
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ zk.close();
|
|
|
+ }
|
|
|
+ String[] tokens = lastPath.split("-");
|
|
|
+ String expectedPath = "/invalidsnap/test-"
|
|
|
+ + String.format("%010d",
|
|
|
+ (new Integer(tokens[1])).intValue() + 1);
|
|
|
+ long eZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
|
|
|
+ // force the zxid to be behind the content
|
|
|
+ zks.getZKDatabase().setlastProcessedZxid(
|
|
|
+ zks.getZKDatabase().getDataTreeLastProcessedZxid() - 10);
|
|
|
+ LOG.info("Set lastProcessedZxid to "
|
|
|
+ + zks.getZKDatabase().getDataTreeLastProcessedZxid());
|
|
|
+ // Force snapshot and restore
|
|
|
+ zks.takeSnapshot();
|
|
|
+ zks.shutdown();
|
|
|
+ f.shutdown();
|
|
|
+
|
|
|
+ zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
+ SyncRequestProcessor.setSnapCount(10000);
|
|
|
+ f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
+ f.startup(zks);
|
|
|
+ Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
+ .waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ connected = false;
|
|
|
+ long fZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
|
|
|
+
|
|
|
+ // Verify lastProcessedZxid is set correctly
|
|
|
+ Assert.assertTrue("Restore failed expected zxid=" + eZxid + " found="
|
|
|
+ + fZxid, fZxid == eZxid);
|
|
|
+ zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
|
|
|
+ start = System.currentTimeMillis();
|
|
|
+ while (!connected) {
|
|
|
+ long end = System.currentTimeMillis();
|
|
|
+ if (end - start > 5000) {
|
|
|
+ Assert.assertTrue("Could not connect with server in 5 seconds",
|
|
|
+ false);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(200);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Intrrupted");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ // Verify correctness of data and whether sequential znode creation
|
|
|
+ // proceeds correctly after this point
|
|
|
+ String[] children;
|
|
|
+ String path;
|
|
|
+ try {
|
|
|
+ children = zk.getChildren("/invalidsnap", false).toArray(
|
|
|
+ new String[0]);
|
|
|
+ path = zk.create("/invalidsnap/test-", new byte[0],
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
|
|
|
+ } finally {
|
|
|
+ zk.close();
|
|
|
+ }
|
|
|
+ LOG.info("Expected " + expectedPath + " found " + path);
|
|
|
+ Assert.assertTrue("Error in sequential znode creation expected "
|
|
|
+ + expectedPath + " found " + path, path.equals(expectedPath));
|
|
|
+ Assert.assertTrue("Unexpected number of children " + children.length
|
|
|
+ + " expected " + NUM_MESSAGES,
|
|
|
+ (children.length == NUM_MESSAGES));
|
|
|
+ f.shutdown();
|
|
|
+ }
|
|
|
+}
|