|
@@ -18,9 +18,9 @@
|
|
|
|
|
|
package org.apache.zookeeper.test;
|
|
|
|
|
|
-import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
@@ -28,13 +28,12 @@ import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
-import org.junit.Test;
|
|
|
-
|
|
|
+import org.apache.log4j.Logger;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
-import org.apache.zookeeper.KeeperException.InvalidACLException;
|
|
|
-import org.apache.zookeeper.KeeperException.Code;
|
|
|
import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
+import org.apache.zookeeper.KeeperException.Code;
|
|
|
+import org.apache.zookeeper.KeeperException.InvalidACLException;
|
|
|
import org.apache.zookeeper.ZooDefs.CreateFlags;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
import org.apache.zookeeper.ZooDefs.Perms;
|
|
@@ -42,8 +41,11 @@ import org.apache.zookeeper.data.ACL;
|
|
|
import org.apache.zookeeper.data.Id;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
import org.apache.zookeeper.proto.WatcherEvent;
|
|
|
+import org.junit.Test;
|
|
|
|
|
|
public class ClientTest extends ClientBase implements Watcher {
|
|
|
+ protected static final Logger LOG = Logger.getLogger(ClientBase.class);
|
|
|
+
|
|
|
LinkedBlockingQueue<WatcherEvent> events =
|
|
|
new LinkedBlockingQueue<WatcherEvent>();
|
|
|
protected volatile CountDownLatch clientConnected;
|
|
@@ -51,17 +53,25 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
protected ZooKeeper createClient(Watcher watcher)
|
|
|
throws IOException, InterruptedException
|
|
|
{
|
|
|
- clientConnected=new CountDownLatch(1);
|
|
|
- ZooKeeper zk = new ZooKeeper(hostPort, 20000, watcher);
|
|
|
- if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
|
|
|
+ return createClient(watcher, hostPort);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected ZooKeeper createClient(Watcher watcher, String hp)
|
|
|
+ throws IOException, InterruptedException
|
|
|
+ {
|
|
|
+ clientConnected = new CountDownLatch(1);
|
|
|
+ ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
|
|
|
+ if (!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
|
|
|
fail("Unable to connect to server");
|
|
|
}
|
|
|
return zk;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
protected void tearDown() throws Exception {
|
|
|
clientConnected = null;
|
|
|
super.tearDown();
|
|
|
+ LOG.info("FINISHED " + getName());
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -116,6 +126,7 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
|
|
|
fail("Should have received an invalid acl error");
|
|
|
} catch(InvalidACLException e) {
|
|
|
+ LOG.error("Invalid acl", e);
|
|
|
}
|
|
|
try {
|
|
|
ArrayList<ACL> testACL = new ArrayList<ACL>();
|
|
@@ -124,6 +135,7 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
zk.create("/acltest", new byte[0], testACL, 0);
|
|
|
fail("Should have received an invalid acl error");
|
|
|
} catch(InvalidACLException e) {
|
|
|
+ LOG.error("Invalid acl", e);
|
|
|
}
|
|
|
zk.addAuthInfo("digest", "ben:passwd".getBytes());
|
|
|
zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
|
|
@@ -158,10 +170,10 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
ZooKeeper zk = null;
|
|
|
try {
|
|
|
zk =createClient(this);
|
|
|
- //System.out.println("Created client: " + zk.describeCNXN());
|
|
|
- System.out.println("Before create /benwashere");
|
|
|
+ //LOG.info("Created client: " + zk.describeCNXN());
|
|
|
+ LOG.info("Before create /benwashere");
|
|
|
zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
- System.out.println("After create /benwashere");
|
|
|
+ LOG.info("After create /benwashere");
|
|
|
try {
|
|
|
zk.setData("/benwashere", "hi".getBytes(), 57);
|
|
|
fail("Should have gotten BadVersion exception");
|
|
@@ -170,15 +182,15 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
} catch (KeeperException e) {
|
|
|
fail("Should have gotten BadVersion exception");
|
|
|
}
|
|
|
- System.out.println("Before delete /benwashere");
|
|
|
+ LOG.info("Before delete /benwashere");
|
|
|
zk.delete("/benwashere", 0);
|
|
|
- System.out.println("Before delete /benwashere");
|
|
|
+ LOG.info("Before delete /benwashere");
|
|
|
zk.close();
|
|
|
- //System.out.println("Closed client: " + zk.describeCNXN());
|
|
|
+ //LOG.info("Closed client: " + zk.describeCNXN());
|
|
|
Thread.sleep(2000);
|
|
|
zk = createClient(this);
|
|
|
- //System.out.println("Created a new client: " + zk.describeCNXN());
|
|
|
- System.out.println("Before delete /");
|
|
|
+ //LOG.info("Created a new client: " + zk.describeCNXN());
|
|
|
+ LOG.info("Before delete /");
|
|
|
|
|
|
try {
|
|
|
zk.delete("/", -1);
|
|
@@ -188,9 +200,9 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
}
|
|
|
Stat stat = new Stat();
|
|
|
// Test basic create, ls, and getData
|
|
|
- System.out.println("Before create /ben");
|
|
|
+ LOG.info("Before create /ben");
|
|
|
zk.create("/ben", "Ben was here".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
- System.out.println("Before getChildren /");
|
|
|
+ LOG.info("Before getChildren /");
|
|
|
List<String> children = zk.getChildren("/", false);
|
|
|
assertEquals(1, children.size());
|
|
|
assertEquals("ben", children.get(0));
|
|
@@ -203,13 +215,13 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
} else {
|
|
|
assertEquals(null, zk.exists("/frog", true));
|
|
|
}
|
|
|
- System.out.println("Comment: asseting passed for frog setting /");
|
|
|
+ LOG.info("Comment: asseting passed for frog setting /");
|
|
|
} catch (KeeperException.NoNodeException e) {
|
|
|
// OK, expected that
|
|
|
}
|
|
|
zk.create("/frog", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
// the first poll is just a sesssion delivery
|
|
|
- System.out.println("Comment: checking for events length " + events.size());
|
|
|
+ LOG.info("Comment: checking for events length " + events.size());
|
|
|
WatcherEvent event = events.poll(10, TimeUnit.SECONDS);
|
|
|
assertEquals("/frog", event.getPath());
|
|
|
assertEquals(Event.EventNodeCreated, event.getType());
|
|
@@ -225,7 +237,7 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
assertEquals(10, children.size());
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
final String name = children.get(i);
|
|
|
- assertTrue(name.startsWith(i + "-"));
|
|
|
+ assertTrue("starts with -", name.startsWith(i + "-"));
|
|
|
byte b[];
|
|
|
if (withWatcherObj) {
|
|
|
b = zk.getData("/ben/" + name, new MyWatcher(), stat);
|
|
@@ -281,9 +293,9 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
// Test that sequential filenames are being created correctly,
|
|
|
// with 0-padding in the filename
|
|
|
public void testSequentialNodeNames() throws IOException, InterruptedException, KeeperException {
|
|
|
- String path = "/SEQUENCE";
|
|
|
- String file = "TEST";
|
|
|
- String filepath = path + "/" + file;
|
|
|
+ String path = "/SEQUENCE";
|
|
|
+ String file = "TEST";
|
|
|
+ String filepath = path + "/" + file;
|
|
|
|
|
|
ZooKeeper zk = null;
|
|
|
try {
|
|
@@ -293,135 +305,213 @@ public class ClientTest extends ClientBase implements Watcher {
|
|
|
List<String> children = zk.getChildren(path, false);
|
|
|
assertEquals(1, children.size());
|
|
|
assertEquals(file + "0000000000", children.get(0));
|
|
|
-
|
|
|
+
|
|
|
zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE);
|
|
|
children = zk.getChildren(path, false);
|
|
|
assertEquals(2, children.size());
|
|
|
- assertTrue(children.contains(file + "0000000001"));
|
|
|
-
|
|
|
+ assertTrue("contains child 1",
|
|
|
+ children.contains(file + "0000000001"));
|
|
|
+
|
|
|
zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE);
|
|
|
children = zk.getChildren(path, false);
|
|
|
assertEquals(3, children.size());
|
|
|
- assertTrue(children.contains(file + "0000000002"));
|
|
|
-
|
|
|
+ assertTrue("contains child 2",
|
|
|
+ children.contains(file + "0000000002"));
|
|
|
+
|
|
|
// The pattern is holding so far. Let's run the counter a bit
|
|
|
// to be sure it continues to spit out the correct answer
|
|
|
for(int i = children.size(); i < 105; i++)
|
|
|
zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE);
|
|
|
|
|
|
children = zk.getChildren(path, false);
|
|
|
- assertTrue(children.contains(file + "0000000104"));
|
|
|
-
|
|
|
+ assertTrue("contains child 104",
|
|
|
+ children.contains(file + "0000000104"));
|
|
|
+
|
|
|
}
|
|
|
finally {
|
|
|
- if(zk != null)
|
|
|
- zk.close();
|
|
|
+ if(zk != null)
|
|
|
+ zk.close();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private void notestConnections() throws IOException, InterruptedException, KeeperException {
|
|
|
- ZooKeeper zk;
|
|
|
- for(int i = 0; i < 2000; i++) {
|
|
|
- if (i % 100 == 0) {
|
|
|
- System.out.println("Testing " + i + " connections");
|
|
|
- }
|
|
|
- // We want to make sure socket descriptors are going away
|
|
|
- zk = new ZooKeeper(hostPort, 30000, this);
|
|
|
- zk.getData("/", false, new Stat());
|
|
|
- zk.close();
|
|
|
+
|
|
|
+// private void notestConnections()
|
|
|
+// throws IOException, InterruptedException, KeeperException
|
|
|
+// {
|
|
|
+// ZooKeeper zk;
|
|
|
+// for(int i = 0; i < 2000; i++) {
|
|
|
+// if (i % 100 == 0) {
|
|
|
+// LOG.info("Testing " + i + " connections");
|
|
|
+// }
|
|
|
+// // We want to make sure socket descriptors are going away
|
|
|
+// zk = new ZooKeeper(hostPort, 30000, this);
|
|
|
+// zk.getData("/", false, new Stat());
|
|
|
+// zk.close();
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testDeleteWithChildren() throws Exception {
|
|
|
+ ZooKeeper zk = createClient(this);
|
|
|
+ zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
+ zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
+ try {
|
|
|
+ zk.delete("/parent", -1);
|
|
|
+ fail("Should have received a not equals message");
|
|
|
+ } catch (KeeperException e) {
|
|
|
+ assertEquals(KeeperException.Code.NotEmpty, e.getCode());
|
|
|
}
|
|
|
+ zk.delete("/parent/child", -1);
|
|
|
+ zk.delete("/parent", -1);
|
|
|
+ zk.close();
|
|
|
}
|
|
|
|
|
|
- static class HammerThread extends Thread {
|
|
|
- ZooKeeper zk;
|
|
|
- String prefix;
|
|
|
- int count;
|
|
|
+ private static class HammerThread extends Thread {
|
|
|
+ private static final long LATENCY = 5;
|
|
|
+
|
|
|
+ private final ZooKeeper zk;
|
|
|
+ private final String prefix;
|
|
|
+ private final int count;
|
|
|
+ private volatile int current = 0;
|
|
|
|
|
|
- HammerThread(ZooKeeper zk, String prefix, int count) {
|
|
|
+ HammerThread(String name, ZooKeeper zk, String prefix, int count) {
|
|
|
+ super(name);
|
|
|
this.zk = zk;
|
|
|
this.prefix = prefix;
|
|
|
this.count = count;
|
|
|
- start();
|
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
byte b[] = new byte[256];
|
|
|
try {
|
|
|
- for (int i = 0; i < count; i++) {
|
|
|
+ for (; current < count; current++) {
|
|
|
// Simulate a bit of network latency...
|
|
|
- Thread.sleep(5);
|
|
|
- zk.create(prefix + i, b, Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
+ Thread.sleep(LATENCY);
|
|
|
+ zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error("Client create operation failed", e);
|
|
|
+ } finally {
|
|
|
+ if (zk != null) {
|
|
|
+ try {
|
|
|
+ zk.close();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Unexpected", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- public void close() throws IOException, InterruptedException {
|
|
|
- zk.close();
|
|
|
+ /*
|
|
|
+ * Verify that all of the servers see the same number of nodes
|
|
|
+ * at the root
|
|
|
+ */
|
|
|
+ void verifyRootOfAllServersMatch(String hostPort)
|
|
|
+ throws InterruptedException, KeeperException, IOException
|
|
|
+ {
|
|
|
+ String parts[] = hostPort.split(",");
|
|
|
+
|
|
|
+ // run through till the counts no longer change on each server
|
|
|
+ // max 15 tries, with 2 second sleeps, so approx 30 seconds
|
|
|
+ int[] counts = new int[parts.length];
|
|
|
+ for (int j = 0; j < 100; j++) {
|
|
|
+ int newcounts[] = new int[parts.length];
|
|
|
+ int i = 0;
|
|
|
+ for (String hp : parts) {
|
|
|
+ ZooKeeper zk = createClient(this, hp);
|
|
|
+ try {
|
|
|
+ newcounts[i++] = zk.getChildren("/", false).size();
|
|
|
+ } finally {
|
|
|
+ zk.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (Arrays.equals(newcounts, counts)) {
|
|
|
+ LOG.info("Found match with array:"
|
|
|
+ + Arrays.toString(newcounts));
|
|
|
+ counts = newcounts;
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ counts = newcounts;
|
|
|
+ Thread.sleep(10000);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- @Test
|
|
|
- public void testDeleteWithChildren() throws Exception {
|
|
|
- File tmpDir = File.createTempFile("test", ".junit", baseTest);
|
|
|
- tmpDir = new File(tmpDir + ".dir");
|
|
|
- tmpDir.mkdirs();
|
|
|
- ZooKeeper zk = createClient(this);
|
|
|
- zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
- zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
- try {
|
|
|
- zk.delete("/parent", -1);
|
|
|
- fail("Should have received a not equals message");
|
|
|
- } catch (KeeperException e) {
|
|
|
- assertEquals(KeeperException.Code.NotEmpty, e.getCode());
|
|
|
+ // verify all the servers reporting same number of nodes
|
|
|
+ for (int i = 1; i < parts.length; i++) {
|
|
|
+ assertEquals("node count not consistent", counts[i-1], counts[i]);
|
|
|
}
|
|
|
- zk.delete("/parent/child", -1);
|
|
|
- zk.delete("/parent", -1);
|
|
|
- zk.close();
|
|
|
}
|
|
|
|
|
|
+
|
|
|
@Test
|
|
|
- public void testHammer() throws IOException,
|
|
|
- InterruptedException, KeeperException {
|
|
|
- File tmpDir = File.createTempFile("test", ".junit", baseTest);
|
|
|
- tmpDir = new File(tmpDir + ".dir");
|
|
|
- tmpDir.mkdirs();
|
|
|
- try {
|
|
|
- final int threadCount = 10;
|
|
|
- final int childCount = 1000;
|
|
|
- ArrayList<HammerThread> threads = new ArrayList<HammerThread>(
|
|
|
- threadCount);
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- for (int i = 0; i < threadCount; i++) {
|
|
|
- Thread.sleep(10);
|
|
|
- ZooKeeper zk = createClient(this);
|
|
|
- String prefix = "/test-" + i;
|
|
|
- zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
- prefix += "/";
|
|
|
- threads.add(new HammerThread(zk, prefix, childCount));
|
|
|
- }
|
|
|
+ public void testHammer()
|
|
|
+ throws IOException, InterruptedException, KeeperException
|
|
|
+ {
|
|
|
+ final int threadCount = 10;
|
|
|
+ final int childCount = 1000;
|
|
|
+
|
|
|
+ HammerThread[] threads = new HammerThread[threadCount];
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ for (int i = 0; i < threads.length; i++) {
|
|
|
+ Thread.sleep(10);
|
|
|
+ ZooKeeper zk = createClient(this);
|
|
|
+ String prefix = "/test-" + i;
|
|
|
+ zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
+ prefix += "/";
|
|
|
+ HammerThread thread =
|
|
|
+ new HammerThread("HammerThread-" + i, zk, prefix, childCount);
|
|
|
+ thread.start();
|
|
|
+
|
|
|
+ threads[i] = thread;
|
|
|
+ }
|
|
|
+
|
|
|
+ // look for the clients to finish their create operations
|
|
|
+ LOG.info("Starting check for completed hammers");
|
|
|
+ int workingCount = threads.length;
|
|
|
+ for (int i = 0; i < 120; i++) {
|
|
|
+ Thread.sleep(10000);
|
|
|
for (HammerThread h : threads) {
|
|
|
- h.join();
|
|
|
- h.close();
|
|
|
+ if (!h.isAlive() || h.current == h.count) {
|
|
|
+ workingCount--;
|
|
|
+ }
|
|
|
}
|
|
|
- System.err.println(new Date() + " Total time "
|
|
|
- + (System.currentTimeMillis() - start));
|
|
|
- ZooKeeper zk = createClient(this);
|
|
|
- LOG.error("******************* Connected to ZooKeeper" + new Date());
|
|
|
- for (int i = 0; i < threadCount; i++) {
|
|
|
- System.err.println("Doing thread: " + i + " " + new Date());
|
|
|
- List<String> children =
|
|
|
- zk.getChildren("/test-" + i, false);
|
|
|
- assertEquals(childCount, children.size());
|
|
|
+ if (workingCount == 0) {
|
|
|
+ break;
|
|
|
}
|
|
|
- for (int i = 0; i < threadCount; i++) {
|
|
|
- List<String> children =
|
|
|
- zk.getChildren("/test-" + i, false);
|
|
|
- assertEquals(childCount, children.size());
|
|
|
+ workingCount = threads.length;
|
|
|
+ }
|
|
|
+ if (workingCount > 0) {
|
|
|
+ for (HammerThread h : threads) {
|
|
|
+ LOG.warn(h.getName() + " never finished creation, current:"
|
|
|
+ + h.current);
|
|
|
}
|
|
|
- } finally {
|
|
|
- // recursiveDelete(tmpDir);
|
|
|
+ } else {
|
|
|
+ LOG.info("Hammer threads completed creation operations");
|
|
|
+ }
|
|
|
+
|
|
|
+ for (HammerThread h : threads) {
|
|
|
+ final int safetyFactor = 3;
|
|
|
+ verifyThreadTerminated(h,
|
|
|
+ threadCount * childCount
|
|
|
+ * HammerThread.LATENCY * safetyFactor);
|
|
|
+ }
|
|
|
+ LOG.info(new Date() + " Total time "
|
|
|
+ + (System.currentTimeMillis() - start));
|
|
|
+
|
|
|
+ ZooKeeper zk = createClient(this);
|
|
|
+
|
|
|
+ LOG.info("******************* Connected to ZooKeeper" + new Date());
|
|
|
+ for (int i = 0; i < threadCount; i++) {
|
|
|
+ LOG.info("Doing thread: " + i + " " + new Date());
|
|
|
+ List<String> children =
|
|
|
+ zk.getChildren("/test-" + i, false);
|
|
|
+ assertEquals(childCount, children.size());
|
|
|
+ }
|
|
|
+ for (int i = 0; i < threadCount; i++) {
|
|
|
+ List<String> children =
|
|
|
+ zk.getChildren("/test-" + i, false);
|
|
|
+ assertEquals(childCount, children.size());
|
|
|
}
|
|
|
}
|
|
|
|