|
@@ -27,14 +27,15 @@ import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
+import org.apache.zookeeper.KeeperException;
|
|
|
+import org.apache.zookeeper.TestableZooKeeper;
|
|
|
import org.apache.zookeeper.WatchedEvent;
|
|
|
-import org.apache.zookeeper.Watcher;
|
|
|
-import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.AsyncCallback.DataCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.StringCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.VoidCallback;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
@@ -56,6 +57,10 @@ public class AsyncHammerTest extends TestCase
|
|
|
}
|
|
|
|
|
|
protected void restart() throws Exception {
|
|
|
+ LOG.info("RESTARTING " + getName());
|
|
|
+ qb.tearDown();
|
|
|
+
|
|
|
+ // don't call setup - we don't want to reassign ports/dirs, etc...
|
|
|
JMXEnv.setUp();
|
|
|
qb.startServers();
|
|
|
}
|
|
@@ -71,21 +76,24 @@ public class AsyncHammerTest extends TestCase
|
|
|
/**
|
|
|
* Create /test- sequence nodes asynchronously, max 30 outstanding
|
|
|
*/
|
|
|
- class HammerThread extends Thread
|
|
|
- implements Watcher, StringCallback, VoidCallback
|
|
|
- {
|
|
|
+ class HammerThread extends Thread implements StringCallback, VoidCallback {
|
|
|
private static final int MAX_OUTSTANDING = 30;
|
|
|
|
|
|
- private ZooKeeper zk;
|
|
|
+ private TestableZooKeeper zk;
|
|
|
private int outstanding;
|
|
|
|
|
|
+ private volatile boolean failed = false;
|
|
|
+
|
|
|
public HammerThread(String name) {
|
|
|
super(name);
|
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
try {
|
|
|
- zk = new ZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, this);
|
|
|
+ CountdownWatcher watcher = new CountdownWatcher();
|
|
|
+ zk = new TestableZooKeeper(qb.hostPort, CONNECTION_TIMEOUT,
|
|
|
+ watcher);
|
|
|
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
while(bang) {
|
|
|
incOutstanding(); // before create otw race
|
|
|
zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
@@ -103,8 +111,12 @@ public class AsyncHammerTest extends TestCase
|
|
|
if (zk != null) {
|
|
|
try {
|
|
|
zk.close();
|
|
|
+ if (!zk.testableWaitForShutdown(CONNECTION_TIMEOUT)) {
|
|
|
+ failed = true;
|
|
|
+ LOG.error("Client did not shutdown");
|
|
|
+ }
|
|
|
} catch (InterruptedException e) {
|
|
|
- LOG.warn("Unexpected", e);
|
|
|
+ LOG.info("Interrupted", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -128,43 +140,67 @@ public class AsyncHammerTest extends TestCase
|
|
|
}
|
|
|
|
|
|
public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
+ if (rc != KeeperException.Code.OK.intValue()) {
|
|
|
+ if (bang) {
|
|
|
+ failed = true;
|
|
|
+ LOG.error("Create failed for 0x"
|
|
|
+ + Long.toHexString(zk.getSessionId())
|
|
|
+ + "with rc:" + rc + " path:" + path);
|
|
|
+ }
|
|
|
+ decOutstanding();
|
|
|
+ return;
|
|
|
+ }
|
|
|
try {
|
|
|
decOutstanding();
|
|
|
- zk.delete(path, -1, this, null);
|
|
|
+ zk.delete(name, -1, this, null);
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Client delete failed", e);
|
|
|
+ if (bang) {
|
|
|
+ failed = true;
|
|
|
+ LOG.error("Client delete failed", e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void processResult(int rc, String path, Object ctx) {
|
|
|
- // ignore for purposes of this test
|
|
|
+ if (rc != KeeperException.Code.OK.intValue()) {
|
|
|
+ if (bang) {
|
|
|
+ failed = true;
|
|
|
+ LOG.error("Delete failed for 0x"
|
|
|
+ + Long.toHexString(zk.getSessionId())
|
|
|
+ + "with rc:" + rc + " path:" + path);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testHammer() throws Exception {
|
|
|
bang = true;
|
|
|
- Thread[] hammers = new Thread[100];
|
|
|
+ LOG.info("Starting hammers");
|
|
|
+ HammerThread[] hammers = new HammerThread[100];
|
|
|
for (int i = 0; i < hammers.length; i++) {
|
|
|
hammers[i] = new HammerThread("HammerThread-" + i);
|
|
|
hammers[i].start();
|
|
|
}
|
|
|
+ LOG.info("Started hammers");
|
|
|
Thread.sleep(5000); // allow the clients to run for max 5sec
|
|
|
bang = false;
|
|
|
+ LOG.info("Stopping hammers");
|
|
|
for (int i = 0; i < hammers.length; i++) {
|
|
|
hammers[i].interrupt();
|
|
|
verifyThreadTerminated(hammers[i], 60000);
|
|
|
+ assertFalse(hammers[i].failed);
|
|
|
}
|
|
|
+
|
|
|
// before restart
|
|
|
- QuorumBase qt = new QuorumBase();
|
|
|
- qt.setUp();
|
|
|
- qt.verifyRootOfAllServersMatch(qb.hostPort);
|
|
|
- tearDown();
|
|
|
+ LOG.info("Hammers stopped, verifying consistency");
|
|
|
+ qb.verifyRootOfAllServersMatch(qb.hostPort);
|
|
|
|
|
|
restart();
|
|
|
|
|
|
// after restart
|
|
|
- qt.verifyRootOfAllServersMatch(qb.hostPort);
|
|
|
+ LOG.info("Verifying hammers 2");
|
|
|
+ qb.verifyRootOfAllServersMatch(qb.hostPort);
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|