|
@@ -613,7 +613,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
// Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
|
|
|
// may trigger the Connected event immediately. So, if we register the
|
|
|
// watcher after constructing ZooKeeper, we may miss that event. Instead,
|
|
|
- // we construct the watcher first, and have it queue any events it receives
|
|
|
+ // we construct the watcher first, and have it block any events it receives
|
|
|
// before we can set its ZooKeeper reference.
|
|
|
WatcherWithClientRef watcher = new WatcherWithClientRef();
|
|
|
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
|
|
@@ -1002,19 +1002,17 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
|
|
|
|
|
|
/**
|
|
|
- * If any events arrive before the reference to ZooKeeper is set,
|
|
|
- * they get queued up and later forwarded when the reference is
|
|
|
- * available.
|
|
|
+ * Latch used to wait until the reference to ZooKeeper is set.
|
|
|
*/
|
|
|
- private final List<WatchedEvent> queuedEvents = Lists.newLinkedList();
|
|
|
+ private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
|
|
|
|
|
|
private WatcherWithClientRef() {
|
|
|
}
|
|
|
|
|
|
private WatcherWithClientRef(ZooKeeper zk) {
|
|
|
- this.zk = zk;
|
|
|
+ setZooKeeperRef(zk);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Waits for the next event from ZooKeeper to arrive.
|
|
|
*
|
|
@@ -1029,9 +1027,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
|
|
|
LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
|
|
|
+ connectionTimeoutMs + " milliseconds");
|
|
|
- synchronized (this) {
|
|
|
- zk.close();
|
|
|
- }
|
|
|
+ zk.close();
|
|
|
throw KeeperException.create(Code.CONNECTIONLOSS);
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
@@ -1041,29 +1037,18 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private synchronized void setZooKeeperRef(ZooKeeper zk) {
|
|
|
+ private void setZooKeeperRef(ZooKeeper zk) {
|
|
|
Preconditions.checkState(this.zk == null,
|
|
|
"zk already set -- must be set exactly once");
|
|
|
this.zk = zk;
|
|
|
-
|
|
|
- for (WatchedEvent e : queuedEvents) {
|
|
|
- forwardEvent(e);
|
|
|
- }
|
|
|
- queuedEvents.clear();
|
|
|
+ hasSetZooKeeper.countDown();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void process(WatchedEvent event) {
|
|
|
- if (zk != null) {
|
|
|
- forwardEvent(event);
|
|
|
- } else {
|
|
|
- queuedEvents.add(event);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void forwardEvent(WatchedEvent event) {
|
|
|
+ public void process(WatchedEvent event) {
|
|
|
hasReceivedEvent.countDown();
|
|
|
try {
|
|
|
+ hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS);
|
|
|
ActiveStandbyElector.this.processWatchEvent(
|
|
|
zk, event);
|
|
|
} catch (Throwable t) {
|