|
@@ -159,6 +159,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
private int createRetryCount = 0;
|
|
|
private int statRetryCount = 0;
|
|
|
private ZooKeeper zkClient;
|
|
|
+ private WatcherWithClientRef watcher;
|
|
|
private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
|
|
|
|
|
|
private final ActiveStandbyElectorCallback appClient;
|
|
@@ -246,6 +247,11 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
if (data == null) {
|
|
|
throw new HadoopIllegalArgumentException("data cannot be null");
|
|
|
}
|
|
|
+
|
|
|
+ if (wantToBeInElection) {
|
|
|
+ LOG.info("Already in election. Not re-connecting.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
appData = new byte[data.length];
|
|
|
System.arraycopy(data, 0, appData, 0, data.length);
|
|
@@ -615,7 +621,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
// watcher after constructing ZooKeeper, we may miss that event. Instead,
|
|
|
// we construct the watcher first, and have it block any events it receives
|
|
|
// before we can set its ZooKeeper reference.
|
|
|
- WatcherWithClientRef watcher = new WatcherWithClientRef();
|
|
|
+ watcher = new WatcherWithClientRef();
|
|
|
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
|
|
|
watcher.setZooKeeperRef(zk);
|
|
|
|
|
@@ -753,6 +759,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
e);
|
|
|
}
|
|
|
zkClient = null;
|
|
|
+ watcher = null;
|
|
|
}
|
|
|
zkClient = getNewZooKeeper();
|
|
|
LOG.debug("Created new connection for " + this);
|
|
@@ -765,12 +772,14 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
LOG.debug("Terminating ZK connection for " + this);
|
|
|
ZooKeeper tempZk = zkClient;
|
|
|
zkClient = null;
|
|
|
+ watcher = null;
|
|
|
try {
|
|
|
tempZk.close();
|
|
|
} catch(InterruptedException e) {
|
|
|
LOG.warn(e);
|
|
|
}
|
|
|
zkConnectionState = ConnectionState.TERMINATED;
|
|
|
+ wantToBeInElection = false;
|
|
|
}
|
|
|
|
|
|
private void reset() {
|
|
@@ -914,7 +923,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
|
|
|
private void monitorLockNodeAsync() {
|
|
|
zkClient.exists(zkLockFilePath,
|
|
|
- new WatcherWithClientRef(zkClient), this,
|
|
|
+ watcher, this,
|
|
|
zkClient);
|
|
|
}
|
|
|
|
|
@@ -1015,13 +1024,6 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
* Latch used to wait until the reference to ZooKeeper is set.
|
|
|
*/
|
|
|
private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
|
|
|
-
|
|
|
- private WatcherWithClientRef() {
|
|
|
- }
|
|
|
-
|
|
|
- private WatcherWithClientRef(ZooKeeper zk) {
|
|
|
- setZooKeeperRef(zk);
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Waits for the next event from ZooKeeper to arrive.
|