|
@@ -60,7 +60,7 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
public class LoadFromLogTest extends ZKTestCase {
|
|
|
- private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
|
|
|
+ private static final String HOST = "127.0.0.1:";
|
|
|
private static final int CONNECTION_TIMEOUT = 3000;
|
|
|
private static final int NUM_MESSAGES = 300;
|
|
|
protected static final Logger LOG = LoggerFactory.getLogger(LoadFromLogTest.class);
|
|
@@ -75,17 +75,18 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testLoad() throws Exception {
|
|
|
+ final String hostPort = HOST + PortAssignment.unique();
|
|
|
// setup a single server cluster
|
|
|
File tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
SyncRequestProcessor.setSnapCount(100);
|
|
|
- final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
+ final int PORT = Integer.parseInt(hostPort.split(":")[1]);
|
|
|
ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
f.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ",
|
|
|
- ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
|
|
|
- ZooKeeper zk = ClientBase.createZKClient(HOSTPORT);
|
|
|
+ ClientBase.waitForServerUp(hostPort,CONNECTION_TIMEOUT));
|
|
|
+ ZooKeeper zk = ClientBase.createZKClient(hostPort);
|
|
|
|
|
|
// generate some transactions that will get logged
|
|
|
try {
|
|
@@ -98,7 +99,7 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
}
|
|
|
f.shutdown();
|
|
|
Assert.assertTrue("waiting for server to shutdown",
|
|
|
- ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ ClientBase.waitForServerDown(hostPort, CONNECTION_TIMEOUT));
|
|
|
|
|
|
// now verify that the FileTxnLog reads every transaction only once
|
|
|
File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
|
|
@@ -135,18 +136,19 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testLoadFailure() throws Exception {
|
|
|
+ final String hostPort = HOST + PortAssignment.unique();
|
|
|
// setup a single server cluster
|
|
|
File tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
// So we have at least 4 logs
|
|
|
SyncRequestProcessor.setSnapCount(50);
|
|
|
- final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
+ final int PORT = Integer.parseInt(hostPort.split(":")[1]);
|
|
|
ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
f.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ",
|
|
|
- ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
|
|
|
- ZooKeeper zk = ClientBase.createZKClient(HOSTPORT);
|
|
|
+ ClientBase.waitForServerUp(hostPort,CONNECTION_TIMEOUT));
|
|
|
+ ZooKeeper zk = ClientBase.createZKClient(hostPort);
|
|
|
|
|
|
// generate some transactions that will get logged
|
|
|
try {
|
|
@@ -159,7 +161,7 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
}
|
|
|
f.shutdown();
|
|
|
Assert.assertTrue("waiting for server to shutdown",
|
|
|
- ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ ClientBase.waitForServerDown(hostPort, CONNECTION_TIMEOUT));
|
|
|
|
|
|
File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
|
|
|
File[] logFiles = FileTxnLog.getLogFiles(logDir.listFiles(), 0);
|
|
@@ -330,79 +332,80 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testRestore() throws Exception {
|
|
|
- // setup a single server cluster
|
|
|
- File tmpDir = ClientBase.createTmpDir();
|
|
|
- ClientBase.setupTestEnv();
|
|
|
- ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
- SyncRequestProcessor.setSnapCount(10000);
|
|
|
- final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
- ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
- f.startup(zks);
|
|
|
- Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
- .waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
- ZooKeeper zk = getConnectedZkClient();
|
|
|
-
|
|
|
- // generate some transactions
|
|
|
- String lastPath = null;
|
|
|
- try {
|
|
|
- zk.create("/invalidsnap", new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
- CreateMode.PERSISTENT);
|
|
|
- for (int i = 0; i < NUM_MESSAGES; i++) {
|
|
|
- lastPath = zk.create("/invalidsnap/test-", new byte[0],
|
|
|
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- zk.close();
|
|
|
- }
|
|
|
- String[] tokens = lastPath.split("-");
|
|
|
- String expectedPath = "/invalidsnap/test-"
|
|
|
- + String.format("%010d",
|
|
|
- (new Integer(tokens[1])).intValue() + 1);
|
|
|
- long eZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
|
|
|
- // force the zxid to be behind the content
|
|
|
- zks.getZKDatabase().setlastProcessedZxid(
|
|
|
- zks.getZKDatabase().getDataTreeLastProcessedZxid() - 10);
|
|
|
- LOG.info("Set lastProcessedZxid to "
|
|
|
- + zks.getZKDatabase().getDataTreeLastProcessedZxid());
|
|
|
- // Force snapshot and restore
|
|
|
- zks.takeSnapshot();
|
|
|
- zks.shutdown();
|
|
|
- f.shutdown();
|
|
|
-
|
|
|
- zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
- SyncRequestProcessor.setSnapCount(10000);
|
|
|
- f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
- f.startup(zks);
|
|
|
- Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
- .waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
- long fZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
|
|
|
-
|
|
|
- // Verify lastProcessedZxid is set correctly
|
|
|
- Assert.assertTrue("Restore failed expected zxid=" + eZxid + " found="
|
|
|
- + fZxid, fZxid == eZxid);
|
|
|
- zk = getConnectedZkClient();
|
|
|
-
|
|
|
- // Verify correctness of data and whether sequential znode creation
|
|
|
- // proceeds correctly after this point
|
|
|
- String[] children;
|
|
|
- String path;
|
|
|
- try {
|
|
|
- children = zk.getChildren("/invalidsnap", false).toArray(
|
|
|
- new String[0]);
|
|
|
- path = zk.create("/invalidsnap/test-", new byte[0],
|
|
|
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
|
|
|
- } finally {
|
|
|
- zk.close();
|
|
|
- }
|
|
|
- LOG.info("Expected " + expectedPath + " found " + path);
|
|
|
- Assert.assertTrue("Error in sequential znode creation expected "
|
|
|
- + expectedPath + " found " + path, path.equals(expectedPath));
|
|
|
- Assert.assertTrue("Unexpected number of children " + children.length
|
|
|
- + " expected " + NUM_MESSAGES,
|
|
|
- (children.length == NUM_MESSAGES));
|
|
|
- f.shutdown();
|
|
|
- zks.shutdown();
|
|
|
- }
|
|
|
+ final String hostPort = HOST + PortAssignment.unique();
|
|
|
+ // setup a single server cluster
|
|
|
+ File tmpDir = ClientBase.createTmpDir();
|
|
|
+ ClientBase.setupTestEnv();
|
|
|
+ ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
+ SyncRequestProcessor.setSnapCount(10000);
|
|
|
+ final int PORT = Integer.parseInt(hostPort.split(":")[1]);
|
|
|
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
+ f.startup(zks);
|
|
|
+ Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
+ .waitForServerUp(hostPort, CONNECTION_TIMEOUT));
|
|
|
+ ZooKeeper zk = getConnectedZkClient(hostPort);
|
|
|
+
|
|
|
+ // generate some transactions
|
|
|
+ String lastPath = null;
|
|
|
+ try {
|
|
|
+ zk.create("/invalidsnap", new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
+ for (int i = 0; i < NUM_MESSAGES; i++) {
|
|
|
+ lastPath = zk.create("/invalidsnap/test-", new byte[0],
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ zk.close();
|
|
|
+ }
|
|
|
+ String[] tokens = lastPath.split("-");
|
|
|
+ String expectedPath = "/invalidsnap/test-"
|
|
|
+ + String.format("%010d",
|
|
|
+ (new Integer(tokens[1])).intValue() + 1);
|
|
|
+ long eZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
|
|
|
+ // force the zxid to be behind the content
|
|
|
+ zks.getZKDatabase().setlastProcessedZxid(
|
|
|
+ zks.getZKDatabase().getDataTreeLastProcessedZxid() - 10);
|
|
|
+ LOG.info("Set lastProcessedZxid to "
|
|
|
+ + zks.getZKDatabase().getDataTreeLastProcessedZxid());
|
|
|
+ // Force snapshot and restore
|
|
|
+ zks.takeSnapshot();
|
|
|
+ zks.shutdown();
|
|
|
+ f.shutdown();
|
|
|
+
|
|
|
+ zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
+ SyncRequestProcessor.setSnapCount(10000);
|
|
|
+ f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
+ f.startup(zks);
|
|
|
+ Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
+ .waitForServerUp(hostPort, CONNECTION_TIMEOUT));
|
|
|
+ long fZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
|
|
|
+
|
|
|
+ // Verify lastProcessedZxid is set correctly
|
|
|
+ Assert.assertTrue("Restore failed expected zxid=" + eZxid + " found="
|
|
|
+ + fZxid, fZxid == eZxid);
|
|
|
+ zk = getConnectedZkClient(hostPort);
|
|
|
+
|
|
|
+ // Verify correctness of data and whether sequential znode creation
|
|
|
+ // proceeds correctly after this point
|
|
|
+ String[] children;
|
|
|
+ String path;
|
|
|
+ try {
|
|
|
+ children = zk.getChildren("/invalidsnap", false).toArray(
|
|
|
+ new String[0]);
|
|
|
+ path = zk.create("/invalidsnap/test-", new byte[0],
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
|
|
|
+ } finally {
|
|
|
+ zk.close();
|
|
|
+ }
|
|
|
+ LOG.info("Expected " + expectedPath + " found " + path);
|
|
|
+ Assert.assertTrue("Error in sequential znode creation expected "
|
|
|
+ + expectedPath + " found " + path, path.equals(expectedPath));
|
|
|
+ Assert.assertTrue("Unexpected number of children " + children.length
|
|
|
+ + " expected " + NUM_MESSAGES,
|
|
|
+ (children.length == NUM_MESSAGES));
|
|
|
+ f.shutdown();
|
|
|
+ zks.shutdown();
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Test we can restore a snapshot that has errors and data ahead of the zxid
|
|
@@ -410,17 +413,18 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testRestoreWithTransactionErrors() throws Exception {
|
|
|
+ final String hostPort = HOST + PortAssignment.unique();
|
|
|
// setup a single server cluster
|
|
|
File tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
SyncRequestProcessor.setSnapCount(10000);
|
|
|
- final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
+ final int PORT = Integer.parseInt(hostPort.split(":")[1]);
|
|
|
ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
f.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
- .waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
- ZooKeeper zk = getConnectedZkClient();
|
|
|
+ .waitForServerUp(hostPort, CONNECTION_TIMEOUT));
|
|
|
+ ZooKeeper zk = getConnectedZkClient(hostPort);
|
|
|
|
|
|
// generate some transactions
|
|
|
try {
|
|
@@ -452,7 +456,7 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
f.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
- .waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ .waitForServerUp(hostPort, CONNECTION_TIMEOUT));
|
|
|
|
|
|
f.shutdown();
|
|
|
zks.shutdown();
|
|
@@ -464,19 +468,19 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
@Test
|
|
|
public void testDatadirAutocreate() throws Exception {
|
|
|
ClientBase.setupTestEnv();
|
|
|
-
|
|
|
+ final String hostPort = HOST + PortAssignment.unique();
|
|
|
// first verify the default (autocreate on) works
|
|
|
File tmpDir = ClientBase.createTmpDir();
|
|
|
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
- final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
+ final int PORT = Integer.parseInt(hostPort.split(":")[1]);
|
|
|
ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
f.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
- .waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ .waitForServerUp(hostPort, CONNECTION_TIMEOUT));
|
|
|
zks.shutdown();
|
|
|
f.shutdown();
|
|
|
Assert.assertTrue("waiting for server being down ", ClientBase
|
|
|
- .waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ .waitForServerDown(hostPort, CONNECTION_TIMEOUT));
|
|
|
|
|
|
try {
|
|
|
// now verify autocreate off works
|
|
@@ -487,7 +491,7 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
f.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ", ClientBase
|
|
|
- .waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ .waitForServerUp(hostPort, CONNECTION_TIMEOUT));
|
|
|
|
|
|
Assert.fail("Server should not have started without datadir");
|
|
|
} catch (IOException e) {
|
|
@@ -504,17 +508,18 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testReloadSnapshotWithMissingParent() throws Exception {
|
|
|
+ final String hostPort = HOST + PortAssignment.unique();
|
|
|
// setup a single server cluster
|
|
|
File tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
SyncRequestProcessor.setSnapCount(10000);
|
|
|
- final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
+ final int PORT = Integer.parseInt(hostPort.split(":")[1]);
|
|
|
ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
f.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ",
|
|
|
- ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
- ZooKeeper zk = getConnectedZkClient();
|
|
|
+ ClientBase.waitForServerUp(hostPort, CONNECTION_TIMEOUT));
|
|
|
+ ZooKeeper zk = getConnectedZkClient(hostPort);
|
|
|
|
|
|
// create transactions to create the snapshot with create/delete pattern
|
|
|
zk.create("/a", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
|
|
@@ -539,12 +544,12 @@ public class LoadFromLogTest extends ZKTestCase {
|
|
|
f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
f.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ",
|
|
|
- ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ ClientBase.waitForServerUp(hostPort, CONNECTION_TIMEOUT));
|
|
|
f.shutdown();
|
|
|
}
|
|
|
|
|
|
- private ZooKeeper getConnectedZkClient() throws Exception {
|
|
|
- ZooKeeper zk = ClientBase.createZKClient(HOSTPORT);
|
|
|
+ private ZooKeeper getConnectedZkClient(String host) throws Exception {
|
|
|
+ ZooKeeper zk = ClientBase.createZKClient(host);
|
|
|
return zk;
|
|
|
}
|
|
|
}
|