|
@@ -20,20 +20,18 @@ package org.apache.zookeeper.test;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Arrays;
|
|
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.Date;
|
|
import java.util.Date;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
import org.apache.log4j.Logger;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.KeeperException;
|
|
-import org.apache.zookeeper.Watcher;
|
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
import org.apache.zookeeper.KeeperException.Code;
|
|
import org.apache.zookeeper.KeeperException.Code;
|
|
import org.apache.zookeeper.KeeperException.InvalidACLException;
|
|
import org.apache.zookeeper.KeeperException.InvalidACLException;
|
|
|
|
+import org.apache.zookeeper.Watcher.Event;
|
|
import org.apache.zookeeper.ZooDefs.CreateFlags;
|
|
import org.apache.zookeeper.ZooDefs.CreateFlags;
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
import org.apache.zookeeper.ZooDefs.Perms;
|
|
import org.apache.zookeeper.ZooDefs.Perms;
|
|
@@ -43,33 +41,14 @@ import org.apache.zookeeper.data.Stat;
|
|
import org.apache.zookeeper.proto.WatcherEvent;
|
|
import org.apache.zookeeper.proto.WatcherEvent;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
-public class ClientTest extends ClientBase implements Watcher {
|
|
|
|
|
|
+public class ClientTest extends ClientBase {
|
|
protected static final Logger LOG = Logger.getLogger(ClientTest.class);
|
|
protected static final Logger LOG = Logger.getLogger(ClientTest.class);
|
|
|
|
|
|
LinkedBlockingQueue<WatcherEvent> events =
|
|
LinkedBlockingQueue<WatcherEvent> events =
|
|
new LinkedBlockingQueue<WatcherEvent>();
|
|
new LinkedBlockingQueue<WatcherEvent>();
|
|
- protected volatile CountDownLatch clientConnected;
|
|
|
|
-
|
|
|
|
- protected ZooKeeper createClient(Watcher watcher)
|
|
|
|
- throws IOException, InterruptedException
|
|
|
|
- {
|
|
|
|
- return createClient(watcher, hostPort);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- protected ZooKeeper createClient(Watcher watcher, String hp)
|
|
|
|
- throws IOException, InterruptedException
|
|
|
|
- {
|
|
|
|
- clientConnected = new CountDownLatch(1);
|
|
|
|
- ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
|
|
|
|
- if (!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
|
|
|
|
- fail("Unable to connect to server");
|
|
|
|
- }
|
|
|
|
- return zk;
|
|
|
|
- }
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected void tearDown() throws Exception {
|
|
protected void tearDown() throws Exception {
|
|
- clientConnected = null;
|
|
|
|
super.tearDown();
|
|
super.tearDown();
|
|
LOG.info("FINISHED " + getName());
|
|
LOG.info("FINISHED " + getName());
|
|
}
|
|
}
|
|
@@ -79,8 +58,8 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
ZooKeeper zkIdle = null;
|
|
ZooKeeper zkIdle = null;
|
|
ZooKeeper zkWatchCreator = null;
|
|
ZooKeeper zkWatchCreator = null;
|
|
try {
|
|
try {
|
|
- zkIdle = createClient(this);
|
|
|
|
- zkWatchCreator = createClient(this);
|
|
|
|
|
|
+ zkIdle = createClient();
|
|
|
|
+ zkWatchCreator = createClient();
|
|
for (int i = 0; i < 30; i++) {
|
|
for (int i = 0; i < 30; i++) {
|
|
zkWatchCreator.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
zkWatchCreator.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
}
|
|
}
|
|
@@ -121,12 +100,13 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
public void testACLs() throws Exception {
|
|
public void testACLs() throws Exception {
|
|
ZooKeeper zk = null;
|
|
ZooKeeper zk = null;
|
|
try {
|
|
try {
|
|
- zk = createClient(this);
|
|
|
|
|
|
+ zk = createClient();
|
|
try {
|
|
try {
|
|
zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
|
|
zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
|
|
fail("Should have received an invalid acl error");
|
|
fail("Should have received an invalid acl error");
|
|
} catch(InvalidACLException e) {
|
|
} catch(InvalidACLException e) {
|
|
- LOG.error("Invalid acl", e);
|
|
|
|
|
|
+ LOG.info("Test successful, invalid acl received : "
|
|
|
|
+ + e.getMessage());
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
ArrayList<ACL> testACL = new ArrayList<ACL>();
|
|
ArrayList<ACL> testACL = new ArrayList<ACL>();
|
|
@@ -135,12 +115,13 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
zk.create("/acltest", new byte[0], testACL, 0);
|
|
zk.create("/acltest", new byte[0], testACL, 0);
|
|
fail("Should have received an invalid acl error");
|
|
fail("Should have received an invalid acl error");
|
|
} catch(InvalidACLException e) {
|
|
} catch(InvalidACLException e) {
|
|
- LOG.error("Invalid acl", e);
|
|
|
|
|
|
+ LOG.info("Test successful, invalid acl received : "
|
|
|
|
+ + e.getMessage());
|
|
}
|
|
}
|
|
zk.addAuthInfo("digest", "ben:passwd".getBytes());
|
|
zk.addAuthInfo("digest", "ben:passwd".getBytes());
|
|
zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
|
|
zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
|
|
zk.close();
|
|
zk.close();
|
|
- zk = createClient(this);
|
|
|
|
|
|
+ zk = createClient();
|
|
zk.addAuthInfo("digest", "ben:passwd2".getBytes());
|
|
zk.addAuthInfo("digest", "ben:passwd2".getBytes());
|
|
try {
|
|
try {
|
|
zk.getData("/acltest", false, new Stat());
|
|
zk.getData("/acltest", false, new Stat());
|
|
@@ -152,7 +133,7 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
zk.getData("/acltest", false, new Stat());
|
|
zk.getData("/acltest", false, new Stat());
|
|
zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1);
|
|
zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1);
|
|
zk.close();
|
|
zk.close();
|
|
- zk = createClient(this);
|
|
|
|
|
|
+ zk = createClient();
|
|
zk.getData("/acltest", false, new Stat());
|
|
zk.getData("/acltest", false, new Stat());
|
|
List<ACL> acls = zk.getACL("/acltest", new Stat());
|
|
List<ACL> acls = zk.getACL("/acltest", new Stat());
|
|
assertEquals(1, acls.size());
|
|
assertEquals(1, acls.size());
|
|
@@ -165,11 +146,25 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void performClientTest(boolean withWatcherObj) throws IOException,
|
|
|
|
- InterruptedException, KeeperException {
|
|
|
|
|
|
+ protected class MyWatcher extends CountdownWatcher {
|
|
|
|
+ public void process(WatcherEvent event) {
|
|
|
|
+ super.process(event);
|
|
|
|
+ if (event.getType() != Event.EventNone) {
|
|
|
|
+ try {
|
|
|
|
+ events.put(event);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOG.warn("ignoring interrupt during event.put");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void performClientTest(boolean withWatcherObj)
|
|
|
|
+ throws IOException, InterruptedException, KeeperException
|
|
|
|
+ {
|
|
ZooKeeper zk = null;
|
|
ZooKeeper zk = null;
|
|
try {
|
|
try {
|
|
- zk =createClient(this);
|
|
|
|
|
|
+ zk = createClient(new MyWatcher(), hostPort);
|
|
//LOG.info("Created client: " + zk.describeCNXN());
|
|
//LOG.info("Created client: " + zk.describeCNXN());
|
|
LOG.info("Before create /benwashere");
|
|
LOG.info("Before create /benwashere");
|
|
zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
|
|
zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
|
|
@@ -188,7 +183,7 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
zk.close();
|
|
zk.close();
|
|
//LOG.info("Closed client: " + zk.describeCNXN());
|
|
//LOG.info("Closed client: " + zk.describeCNXN());
|
|
Thread.sleep(2000);
|
|
Thread.sleep(2000);
|
|
- zk = createClient(this);
|
|
|
|
|
|
+ zk = createClient(new MyWatcher(), hostPort);
|
|
//LOG.info("Created a new client: " + zk.describeCNXN());
|
|
//LOG.info("Created a new client: " + zk.describeCNXN());
|
|
LOG.info("Before delete /");
|
|
LOG.info("Before delete /");
|
|
|
|
|
|
@@ -213,7 +208,7 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
if (withWatcherObj) {
|
|
if (withWatcherObj) {
|
|
assertEquals(null, zk.exists("/frog", new MyWatcher()));
|
|
assertEquals(null, zk.exists("/frog", new MyWatcher()));
|
|
} else {
|
|
} else {
|
|
- assertEquals(null, zk.exists("/frog", true));
|
|
|
|
|
|
+ assertEquals(null, zk.exists("/frog", true));
|
|
}
|
|
}
|
|
LOG.info("Comment: asseting passed for frog setting /");
|
|
LOG.info("Comment: asseting passed for frog setting /");
|
|
} catch (KeeperException.NoNodeException e) {
|
|
} catch (KeeperException.NoNodeException e) {
|
|
@@ -292,14 +287,17 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
|
|
|
// Test that sequential filenames are being created correctly,
|
|
// Test that sequential filenames are being created correctly,
|
|
// with 0-padding in the filename
|
|
// with 0-padding in the filename
|
|
- public void testSequentialNodeNames() throws IOException, InterruptedException, KeeperException {
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testSequentialNodeNames()
|
|
|
|
+ throws IOException, InterruptedException, KeeperException
|
|
|
|
+ {
|
|
String path = "/SEQUENCE";
|
|
String path = "/SEQUENCE";
|
|
- String file = "TEST";
|
|
|
|
- String filepath = path + "/" + file;
|
|
|
|
|
|
+ String file = "TEST";
|
|
|
|
+ String filepath = path + "/" + file;
|
|
|
|
|
|
ZooKeeper zk = null;
|
|
ZooKeeper zk = null;
|
|
try {
|
|
try {
|
|
- zk =createClient(this);
|
|
|
|
|
|
+ zk = createClient();
|
|
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE);
|
|
zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE);
|
|
List<String> children = zk.getChildren(path, false);
|
|
List<String> children = zk.getChildren(path, false);
|
|
@@ -351,7 +349,7 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testDeleteWithChildren() throws Exception {
|
|
public void testDeleteWithChildren() throws Exception {
|
|
- ZooKeeper zk = createClient(this);
|
|
|
|
|
|
+ ZooKeeper zk = createClient();
|
|
zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
try {
|
|
try {
|
|
@@ -364,20 +362,27 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
zk.delete("/parent", -1);
|
|
zk.delete("/parent", -1);
|
|
zk.close();
|
|
zk.close();
|
|
}
|
|
}
|
|
-
|
|
|
|
- private static class HammerThread extends Thread {
|
|
|
|
- private static final long LATENCY = 5;
|
|
|
|
|
|
+
|
|
|
|
+ private static final long HAMMERTHREAD_LATENCY = 5;
|
|
|
|
+
|
|
|
|
+ private static abstract class HammerThread extends Thread {
|
|
|
|
+ protected final int count;
|
|
|
|
+ protected volatile int current = 0;
|
|
|
|
|
|
|
|
+ HammerThread(String name, int count) {
|
|
|
|
+ super(name);
|
|
|
|
+ this.count = count;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static class BasicHammerThread extends HammerThread {
|
|
private final ZooKeeper zk;
|
|
private final ZooKeeper zk;
|
|
private final String prefix;
|
|
private final String prefix;
|
|
- private final int count;
|
|
|
|
- private volatile int current = 0;
|
|
|
|
|
|
|
|
- HammerThread(String name, ZooKeeper zk, String prefix, int count) {
|
|
|
|
- super(name);
|
|
|
|
|
|
+ BasicHammerThread(String name, ZooKeeper zk, String prefix, int count) {
|
|
|
|
+ super(name, count);
|
|
this.zk = zk;
|
|
this.zk = zk;
|
|
this.prefix = prefix;
|
|
this.prefix = prefix;
|
|
- this.count = count;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public void run() {
|
|
public void run() {
|
|
@@ -385,87 +390,129 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
try {
|
|
try {
|
|
for (; current < count; current++) {
|
|
for (; current < count; current++) {
|
|
// Simulate a bit of network latency...
|
|
// Simulate a bit of network latency...
|
|
- Thread.sleep(LATENCY);
|
|
|
|
|
|
+ Thread.sleep(HAMMERTHREAD_LATENCY);
|
|
zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0);
|
|
zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0);
|
|
}
|
|
}
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.error("Client create operation failed", e);
|
|
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.error("Client create operation failed", t);
|
|
} finally {
|
|
} finally {
|
|
- if (zk != null) {
|
|
|
|
- try {
|
|
|
|
- zk.close();
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- LOG.warn("Unexpected", e);
|
|
|
|
- }
|
|
|
|
|
|
+ try {
|
|
|
|
+ zk.close();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOG.warn("Unexpected", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /*
|
|
|
|
- * Verify that all of the servers see the same number of nodes
|
|
|
|
- * at the root
|
|
|
|
- */
|
|
|
|
- void verifyRootOfAllServersMatch(String hostPort)
|
|
|
|
- throws InterruptedException, KeeperException, IOException
|
|
|
|
- {
|
|
|
|
- String parts[] = hostPort.split(",");
|
|
|
|
|
|
+ private static class SuperHammerThread extends HammerThread {
|
|
|
|
+ private final ClientTest parent;
|
|
|
|
+ private final String prefix;
|
|
|
|
|
|
- // run through till the counts no longer change on each server
|
|
|
|
- // max 15 tries, with 2 second sleeps, so approx 30 seconds
|
|
|
|
- int[] counts = new int[parts.length];
|
|
|
|
- for (int j = 0; j < 100; j++) {
|
|
|
|
- int newcounts[] = new int[parts.length];
|
|
|
|
- int i = 0;
|
|
|
|
- for (String hp : parts) {
|
|
|
|
- ZooKeeper zk = createClient(this, hp);
|
|
|
|
- try {
|
|
|
|
- newcounts[i++] = zk.getChildren("/", false).size();
|
|
|
|
- } finally {
|
|
|
|
- zk.close();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ SuperHammerThread(String name, ClientTest parent, String prefix,
|
|
|
|
+ int count)
|
|
|
|
+ {
|
|
|
|
+ super(name, count);
|
|
|
|
+ this.parent = parent;
|
|
|
|
+ this.prefix = prefix;
|
|
|
|
+ }
|
|
|
|
|
|
- if (Arrays.equals(newcounts, counts)) {
|
|
|
|
- LOG.info("Found match with array:"
|
|
|
|
- + Arrays.toString(newcounts));
|
|
|
|
- counts = newcounts;
|
|
|
|
- break;
|
|
|
|
- } else {
|
|
|
|
- counts = newcounts;
|
|
|
|
- Thread.sleep(10000);
|
|
|
|
|
|
+ public void run() {
|
|
|
|
+ byte b[] = new byte[256];
|
|
|
|
+ try {
|
|
|
|
+ for (; current < count; current++) {
|
|
|
|
+ ZooKeeper zk = parent.createClient();
|
|
|
|
+ try {
|
|
|
|
+ zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
|
+ } finally {
|
|
|
|
+ try {
|
|
|
|
+ zk.close();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOG.warn("Unexpected", e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.error("Client create operation failed", t);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- // verify all the servers reporting same number of nodes
|
|
|
|
- for (int i = 1; i < parts.length; i++) {
|
|
|
|
- assertEquals("node count not consistent", counts[i-1], counts[i]);
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Separate threads each creating a number of nodes. Each thread
|
|
|
|
+ * is using a non-shared (owned by thread) client for all node creations.
|
|
|
|
+ * @throws Throwable
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testHammerBasic() throws Throwable {
|
|
|
|
+ try {
|
|
|
|
+ final int threadCount = 10;
|
|
|
|
+ final int childCount = 1000;
|
|
|
|
+
|
|
|
|
+ HammerThread[] threads = new HammerThread[threadCount];
|
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
|
+ for (int i = 0; i < threads.length; i++) {
|
|
|
|
+ ZooKeeper zk = createClient();
|
|
|
|
+ String prefix = "/test-" + i;
|
|
|
|
+ zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
|
+ prefix += "/";
|
|
|
|
+ HammerThread thread =
|
|
|
|
+ new BasicHammerThread("BasicHammerThread-" + i, zk, prefix,
|
|
|
|
+ childCount);
|
|
|
|
+ thread.start();
|
|
|
|
+
|
|
|
|
+ threads[i] = thread;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ verifyHammer(start, threads, childCount);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.error("test failed", t);
|
|
|
|
+ throw t;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Separate threads each creating a number of nodes. Each thread
|
|
|
|
+ * is creating a new client for each node creation.
|
|
|
|
+ * @throws Throwable
|
|
|
|
+ */
|
|
@Test
|
|
@Test
|
|
- public void testHammer()
|
|
|
|
- throws IOException, InterruptedException, KeeperException
|
|
|
|
- {
|
|
|
|
- final int threadCount = 10;
|
|
|
|
- final int childCount = 1000;
|
|
|
|
-
|
|
|
|
- HammerThread[] threads = new HammerThread[threadCount];
|
|
|
|
- long start = System.currentTimeMillis();
|
|
|
|
- for (int i = 0; i < threads.length; i++) {
|
|
|
|
- Thread.sleep(10);
|
|
|
|
- ZooKeeper zk = createClient(this);
|
|
|
|
- String prefix = "/test-" + i;
|
|
|
|
- zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
|
- prefix += "/";
|
|
|
|
- HammerThread thread =
|
|
|
|
- new HammerThread("HammerThread-" + i, zk, prefix, childCount);
|
|
|
|
- thread.start();
|
|
|
|
|
|
+ public void testHammerSuper() throws Throwable {
|
|
|
|
+ try {
|
|
|
|
+ final int threadCount = 5;
|
|
|
|
+ final int childCount = 10;
|
|
|
|
+
|
|
|
|
+ HammerThread[] threads = new HammerThread[threadCount];
|
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
|
+ for (int i = 0; i < threads.length; i++) {
|
|
|
|
+ String prefix = "/test-" + i;
|
|
|
|
+ {
|
|
|
|
+ ZooKeeper zk = createClient();
|
|
|
|
+ try {
|
|
|
|
+ zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
|
+ } finally {
|
|
|
|
+ zk.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ prefix += "/";
|
|
|
|
+ HammerThread thread =
|
|
|
|
+ new SuperHammerThread("SuperHammerThread-" + i, this,
|
|
|
|
+ prefix, childCount);
|
|
|
|
+ thread.start();
|
|
|
|
+
|
|
|
|
+ threads[i] = thread;
|
|
|
|
+ }
|
|
|
|
|
|
- threads[i] = thread;
|
|
|
|
|
|
+ verifyHammer(start, threads, childCount);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.error("test failed", t);
|
|
|
|
+ throw t;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void verifyHammer(long start, HammerThread[] threads, int childCount)
|
|
|
|
+ throws IOException, InterruptedException, KeeperException
|
|
|
|
+ {
|
|
// look for the clients to finish their create operations
|
|
// look for the clients to finish their create operations
|
|
LOG.info("Starting check for completed hammers");
|
|
LOG.info("Starting check for completed hammers");
|
|
int workingCount = threads.length;
|
|
int workingCount = threads.length;
|
|
@@ -493,44 +540,75 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
for (HammerThread h : threads) {
|
|
for (HammerThread h : threads) {
|
|
final int safetyFactor = 3;
|
|
final int safetyFactor = 3;
|
|
verifyThreadTerminated(h,
|
|
verifyThreadTerminated(h,
|
|
- threadCount * childCount
|
|
|
|
- * HammerThread.LATENCY * safetyFactor);
|
|
|
|
|
|
+ threads.length * childCount
|
|
|
|
+ * HAMMERTHREAD_LATENCY * safetyFactor);
|
|
}
|
|
}
|
|
LOG.info(new Date() + " Total time "
|
|
LOG.info(new Date() + " Total time "
|
|
+ (System.currentTimeMillis() - start));
|
|
+ (System.currentTimeMillis() - start));
|
|
|
|
|
|
- ZooKeeper zk = createClient(this);
|
|
|
|
-
|
|
|
|
- LOG.info("******************* Connected to ZooKeeper" + new Date());
|
|
|
|
- for (int i = 0; i < threadCount; i++) {
|
|
|
|
- LOG.info("Doing thread: " + i + " " + new Date());
|
|
|
|
- List<String> children =
|
|
|
|
- zk.getChildren("/test-" + i, false);
|
|
|
|
- assertEquals(childCount, children.size());
|
|
|
|
- }
|
|
|
|
- for (int i = 0; i < threadCount; i++) {
|
|
|
|
- List<String> children =
|
|
|
|
- zk.getChildren("/test-" + i, false);
|
|
|
|
- assertEquals(childCount, children.size());
|
|
|
|
|
|
+ ZooKeeper zk = createClient();
|
|
|
|
+ try {
|
|
|
|
+
|
|
|
|
+ LOG.info("******************* Connected to ZooKeeper" + new Date());
|
|
|
|
+ for (int i = 0; i < threads.length; i++) {
|
|
|
|
+ LOG.info("Doing thread: " + i + " " + new Date());
|
|
|
|
+ List<String> children =
|
|
|
|
+ zk.getChildren("/test-" + i, false);
|
|
|
|
+ assertEquals(childCount, children.size());
|
|
|
|
+ }
|
|
|
|
+ for (int i = 0; i < threads.length; i++) {
|
|
|
|
+ List<String> children =
|
|
|
|
+ zk.getChildren("/test-" + i, false);
|
|
|
|
+ assertEquals(childCount, children.size());
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ zk.close();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- public class MyWatcher implements Watcher {
|
|
|
|
- public void process(WatcherEvent event) {
|
|
|
|
- ClientTest.this.process(event);
|
|
|
|
|
|
+
|
|
|
|
+ private class VerifyClientCleanup extends Thread {
|
|
|
|
+ int count;
|
|
|
|
+ int current = 0;
|
|
|
|
+
|
|
|
|
+ VerifyClientCleanup(String name, int count) {
|
|
|
|
+ super(name);
|
|
|
|
+ this.count = count;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ for (; current < count; current++) {
|
|
|
|
+ ZooKeeper zk = createClient();
|
|
|
|
+ zk.close();
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.error("test failed", t);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void process(WatcherEvent event) {
|
|
|
|
- if (event.getState() == Event.KeeperStateSyncConnected) {
|
|
|
|
- clientConnected.countDown();
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Verify that the client is cleaning up properly. Open/close a large
|
|
|
|
+ * number of sessions. Essentially looking to see if sockets/selectors
|
|
|
|
+ * are being cleaned up properly during close.
|
|
|
|
+ *
|
|
|
|
+ * @throws Throwable
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testClientCleanup() throws Throwable {
|
|
|
|
+ final int threadCount = 20;
|
|
|
|
+ final int clientCount = 100;
|
|
|
|
+
|
|
|
|
+ VerifyClientCleanup threads[] = new VerifyClientCleanup[threadCount];
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < threads.length; i++) {
|
|
|
|
+ threads[i] = new VerifyClientCleanup("VCC" + i, clientCount);
|
|
|
|
+ threads[i].start();
|
|
}
|
|
}
|
|
- if (event.getType() != Event.EventNone) {
|
|
|
|
- try {
|
|
|
|
- events.put(event);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < threads.length; i++) {
|
|
|
|
+ threads[i].join(600000);
|
|
|
|
+ assertTrue(threads[i].current == threads[i].count);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|