|
@@ -28,6 +28,7 @@ import org.apache.zookeeper.WatchedEvent;
|
|
|
import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZooDefs;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
+import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
import org.apache.zookeeper.server.quorum.Leader;
|
|
@@ -180,14 +181,14 @@ public class QuorumTest extends QuorumBase {
|
|
|
@Test
|
|
|
public void testSessionMoved() throws IOException, InterruptedException, KeeperException {
|
|
|
String hostPorts[] = qb.hostPort.split(",");
|
|
|
- ZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
public void process(WatchedEvent event) {
|
|
|
}});
|
|
|
zk.create("/sessionMoveTest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
|
|
// we want to loop through the list twice
|
|
|
for(int i = 0; i < hostPorts.length*2; i++) {
|
|
|
// This should stomp the zk handle
|
|
|
- ZooKeeper zknew = new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length], ClientBase.CONNECTION_TIMEOUT,
|
|
|
+ DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length], ClientBase.CONNECTION_TIMEOUT,
|
|
|
new Watcher() {public void process(WatchedEvent event) {
|
|
|
}},
|
|
|
zk.getSessionId(),
|
|
@@ -196,14 +197,23 @@ public class QuorumTest extends QuorumBase {
|
|
|
try {
|
|
|
zk.setData("/", new byte[1], -1);
|
|
|
fail("Should have lost the connection");
|
|
|
- } catch(KeeperException.SessionMovedException e) {
|
|
|
+ } catch(KeeperException.ConnectionLossException e) {
|
|
|
}
|
|
|
- //zk.close();
|
|
|
+ zk.disconnect(); // close w/o closing session
|
|
|
zk = zknew;
|
|
|
}
|
|
|
zk.close();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private static class DiscoWatcher implements Watcher {
|
|
|
+ volatile boolean zkDisco = false;
|
|
|
+ public void process(WatchedEvent event) {
|
|
|
+ if (event.getState() == KeeperState.Disconnected) {
|
|
|
+ zkDisco = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
/**
|
|
|
* Connect to two different servers with two different handles using the same session and
|
|
@@ -211,32 +221,41 @@ public class QuorumTest extends QuorumBase {
|
|
|
*/
|
|
|
public void testSessionMove() throws IOException, InterruptedException, KeeperException {
|
|
|
String hps[] = qb.hostPort.split(",");
|
|
|
- ZooKeeper zk = new DisconnectableZooKeeper(hps[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
- public void process(WatchedEvent event) {
|
|
|
- }});
|
|
|
+ DiscoWatcher oldWatcher = new DiscoWatcher();
|
|
|
+ ZooKeeper zk = new DisconnectableZooKeeper(hps[0],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, oldWatcher);
|
|
|
zk.create("/t1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
|
|
// This should stomp the zk handle
|
|
|
- ZooKeeper zknew = new DisconnectableZooKeeper(hps[1], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
- public void process(WatchedEvent event) {
|
|
|
- }}, zk.getSessionId(), zk.getSessionPasswd());
|
|
|
+ DiscoWatcher watcher = new DiscoWatcher();
|
|
|
+ ZooKeeper zknew = new DisconnectableZooKeeper(hps[1],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(),
|
|
|
+ zk.getSessionPasswd());
|
|
|
zknew.create("/t2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
|
|
try {
|
|
|
zk.create("/t3", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
|
|
fail("Should have lost the connection");
|
|
|
- } catch(KeeperException.SessionMovedException e) {
|
|
|
+ } catch(KeeperException.ConnectionLossException e) {
|
|
|
+ // wait up to 30 seconds for the disco to be delivered
|
|
|
+ for (int i = 0; i < 30; i++) {
|
|
|
+ if (oldWatcher.zkDisco) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ assertTrue(oldWatcher.zkDisco);
|
|
|
}
|
|
|
|
|
|
ArrayList<ZooKeeper> toClose = new ArrayList<ZooKeeper>();
|
|
|
toClose.add(zknew);
|
|
|
// Let's just make sure it can still move
|
|
|
for(int i = 0; i < 10; i++) {
|
|
|
- zknew = new DisconnectableZooKeeper(hps[1], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
- public void process(WatchedEvent event) {
|
|
|
- }}, zk.getSessionId(), zk.getSessionPasswd());
|
|
|
+ zknew = new DisconnectableZooKeeper(hps[1],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(),
|
|
|
+ zk.getSessionId(), zk.getSessionPasswd());
|
|
|
toClose.add(zknew);
|
|
|
zknew.create("/t-"+i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
|
|
}
|
|
|
- for(ZooKeeper z: toClose) {
|
|
|
+ for (ZooKeeper z: toClose) {
|
|
|
z.close();
|
|
|
}
|
|
|
zk.close();
|