|
@@ -17,60 +17,42 @@
|
|
|
|
|
|
package org.apache.zookeeper.test;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
import org.apache.log4j.Logger;
|
|
|
-import org.apache.zookeeper.*;
|
|
|
+import org.apache.zookeeper.AsyncCallback;
|
|
|
+import org.apache.zookeeper.CreateMode;
|
|
|
+import org.apache.zookeeper.KeeperException;
|
|
|
+import org.apache.zookeeper.Op;
|
|
|
+import org.apache.zookeeper.OpResult;
|
|
|
+import org.apache.zookeeper.OpResult.ErrorResult;
|
|
|
+import org.apache.zookeeper.WatchedEvent;
|
|
|
+import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
-import org.apache.zookeeper.server.ServerCnxnFactory;
|
|
|
+import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.server.SyncRequestProcessor;
|
|
|
-import org.apache.zookeeper.server.ZooKeeperServer;
|
|
|
-import org.apache.zookeeper.OpResult.ErrorResult;
|
|
|
-import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
-import java.io.File;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.List;
|
|
|
-import java.util.ArrayList;
|
|
|
-
|
|
|
-import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
-import static org.junit.Assert.fail;
|
|
|
-
|
|
|
-public class MultiTransactionTest extends ZKTestCase implements Watcher {
|
|
|
+public class MultiTransactionTest extends ClientBase {
|
|
|
private static final Logger LOG = Logger.getLogger(MultiTransactionTest.class);
|
|
|
|
|
|
- private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
|
|
|
-
|
|
|
private ZooKeeper zk;
|
|
|
- private ServerCnxnFactory serverFactory;
|
|
|
-
|
|
|
- @Override
|
|
|
- public void process(WatchedEvent event) {
|
|
|
- // ignore
|
|
|
- }
|
|
|
|
|
|
@Before
|
|
|
- public void setupZk() throws Exception {
|
|
|
- File tmpDir = ClientBase.createTmpDir();
|
|
|
- ClientBase.setupTestEnv();
|
|
|
- ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
+ public void setUp() throws Exception {
|
|
|
SyncRequestProcessor.setSnapCount(150);
|
|
|
- final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
- serverFactory = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
- serverFactory.startup(zks);
|
|
|
- LOG.info("starting up the zookeeper server .. waiting");
|
|
|
- Assert.assertTrue("waiting for server being up",
|
|
|
- ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
- zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
|
|
|
- }
|
|
|
-
|
|
|
- @After
|
|
|
- public void shutdownServer() throws Exception {
|
|
|
- zk.close();
|
|
|
- serverFactory.shutdown();
|
|
|
+ super.setUp();
|
|
|
+ zk = createClient();
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -227,20 +209,17 @@ public class MultiTransactionTest extends ZKTestCase implements Watcher {
|
|
|
public void testWatchesTriggered() throws KeeperException, InterruptedException {
|
|
|
HasTriggeredWatcher watcher = new HasTriggeredWatcher();
|
|
|
zk.getChildren("/", watcher);
|
|
|
- zk.sync("/", null, null);
|
|
|
zk.multi(Arrays.asList(
|
|
|
Op.create("/t", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
|
|
|
Op.delete("/t", -1)
|
|
|
));
|
|
|
- zk.sync("/", null, null);
|
|
|
- assertTrue(watcher.triggered);
|
|
|
+ assertTrue(watcher.triggered.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testNoWatchesTriggeredForFailedMultiRequest() throws InterruptedException, KeeperException {
|
|
|
HasTriggeredWatcher watcher = new HasTriggeredWatcher();
|
|
|
zk.getChildren("/", watcher);
|
|
|
- zk.sync("/", null, null);
|
|
|
try {
|
|
|
zk.multi(Arrays.asList(
|
|
|
Op.create("/t", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
|
|
@@ -250,15 +229,28 @@ public class MultiTransactionTest extends ZKTestCase implements Watcher {
|
|
|
} catch (KeeperException.NoNodeException e) {
|
|
|
// expected
|
|
|
}
|
|
|
- zk.sync("/", null, null);
|
|
|
- assertFalse(watcher.triggered);
|
|
|
+ SyncCallback cb = new SyncCallback();
|
|
|
+ zk.sync("/", cb, null);
|
|
|
+
|
|
|
+ // by waiting for the callback we're assured that the event queue is flushed
|
|
|
+ cb.done.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
|
|
|
+ assertEquals(1, watcher.triggered.getCount());
|
|
|
}
|
|
|
|
|
|
- private static final class HasTriggeredWatcher implements Watcher {
|
|
|
- boolean triggered = false;
|
|
|
+ private static class HasTriggeredWatcher implements Watcher {
|
|
|
+ private final CountDownLatch triggered = new CountDownLatch(1);
|
|
|
+
|
|
|
@Override
|
|
|
public void process(WatchedEvent event) {
|
|
|
- triggered = true;
|
|
|
+ triggered.countDown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private static class SyncCallback implements AsyncCallback.VoidCallback {
|
|
|
+ private final CountDownLatch done = new CountDownLatch(1);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void processResult(int rc, String path, Object ctx) {
|
|
|
+ done.countDown();
|
|
|
}
|
|
|
}
|
|
|
}
|