|
@@ -21,6 +21,8 @@ package org.apache.hadoop.ha;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
@@ -45,6 +47,7 @@ import org.apache.zookeeper.KeeperException.Code;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
|
|
|
/**
|
|
/**
|
|
*
|
|
*
|
|
@@ -205,7 +208,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
|
List<ZKAuthInfo> authInfo,
|
|
List<ZKAuthInfo> authInfo,
|
|
ActiveStandbyElectorCallback app) throws IOException,
|
|
ActiveStandbyElectorCallback app) throws IOException,
|
|
- HadoopIllegalArgumentException {
|
|
|
|
|
|
+ HadoopIllegalArgumentException, KeeperException {
|
|
if (app == null || acl == null || parentZnodeName == null
|
|
if (app == null || acl == null || parentZnodeName == null
|
|
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
|
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
|
throw new HadoopIllegalArgumentException("Invalid argument");
|
|
throw new HadoopIllegalArgumentException("Invalid argument");
|
|
@@ -602,10 +605,24 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
*
|
|
*
|
|
* @return new zookeeper client instance
|
|
* @return new zookeeper client instance
|
|
* @throws IOException
|
|
* @throws IOException
|
|
|
|
+ * @throws KeeperException zookeeper connectionloss exception
|
|
*/
|
|
*/
|
|
- protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
|
|
|
|
- ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
|
|
|
|
- zk.register(new WatcherWithClientRef(zk));
|
|
|
|
|
|
+ protected synchronized ZooKeeper getNewZooKeeper() throws IOException,
|
|
|
|
+ KeeperException {
|
|
|
|
+
|
|
|
|
+ // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
|
|
|
|
+ // may trigger the Connected event immediately. So, if we register the
|
|
|
|
+ // watcher after constructing ZooKeeper, we may miss that event. Instead,
|
|
|
|
+ // we construct the watcher first, and have it queue any events it receives
|
|
|
|
+ // before we can set its ZooKeeper reference.
|
|
|
|
+ WatcherWithClientRef watcher = new WatcherWithClientRef();
|
|
|
|
+ ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
|
|
|
|
+ watcher.setZooKeeperRef(zk);
|
|
|
|
+
|
|
|
|
+ // Wait for the asynchronous success/failure. This may throw an exception
|
|
|
|
+ // if we don't connect within the session timeout.
|
|
|
|
+ watcher.waitForZKConnectionEvent(zkSessionTimeout);
|
|
|
|
+
|
|
for (ZKAuthInfo auth : zkAuthInfo) {
|
|
for (ZKAuthInfo auth : zkAuthInfo) {
|
|
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
|
|
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
|
|
}
|
|
}
|
|
@@ -710,13 +727,16 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
} catch(IOException e) {
|
|
} catch(IOException e) {
|
|
LOG.warn(e);
|
|
LOG.warn(e);
|
|
sleepFor(5000);
|
|
sleepFor(5000);
|
|
|
|
+ } catch(KeeperException e) {
|
|
|
|
+ LOG.warn(e);
|
|
|
|
+ sleepFor(5000);
|
|
}
|
|
}
|
|
++connectionRetryCount;
|
|
++connectionRetryCount;
|
|
}
|
|
}
|
|
return success;
|
|
return success;
|
|
}
|
|
}
|
|
|
|
|
|
- private void createConnection() throws IOException {
|
|
|
|
|
|
+ private void createConnection() throws IOException, KeeperException {
|
|
if (zkClient != null) {
|
|
if (zkClient != null) {
|
|
try {
|
|
try {
|
|
zkClient.close();
|
|
zkClient.close();
|
|
@@ -973,14 +993,76 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
* events.
|
|
* events.
|
|
*/
|
|
*/
|
|
private final class WatcherWithClientRef implements Watcher {
|
|
private final class WatcherWithClientRef implements Watcher {
|
|
- private final ZooKeeper zk;
|
|
|
|
|
|
+ private ZooKeeper zk;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Latch fired whenever any event arrives. This is used in order
|
|
|
|
+ * to wait for the Connected event when the client is first created.
|
|
|
|
+ */
|
|
|
|
+ private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * If any events arrive before the reference to ZooKeeper is set,
|
|
|
|
+ * they get queued up and later forwarded when the reference is
|
|
|
|
+ * available.
|
|
|
|
+ */
|
|
|
|
+ private final List<WatchedEvent> queuedEvents = Lists.newLinkedList();
|
|
|
|
+
|
|
|
|
+ private WatcherWithClientRef() {
|
|
|
|
+ }
|
|
|
|
|
|
private WatcherWithClientRef(ZooKeeper zk) {
|
|
private WatcherWithClientRef(ZooKeeper zk) {
|
|
this.zk = zk;
|
|
this.zk = zk;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Waits for the next event from ZooKeeper to arrive.
|
|
|
|
+ *
|
|
|
|
+ * @param connectionTimeoutMs zookeeper connection timeout in milliseconds
|
|
|
|
+ * @throws KeeperException if the connection attempt times out. This will
|
|
|
|
+ * be a ZooKeeper ConnectionLoss exception code.
|
|
|
|
+ * @throws IOException if interrupted while connecting to ZooKeeper
|
|
|
|
+ */
|
|
|
|
+ private void waitForZKConnectionEvent(int connectionTimeoutMs)
|
|
|
|
+ throws KeeperException, IOException {
|
|
|
|
+ try {
|
|
|
|
+ if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
|
|
|
|
+ LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
|
|
|
|
+ + connectionTimeoutMs + " milliseconds");
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ zk.close();
|
|
|
|
+ }
|
|
|
|
+ throw KeeperException.create(Code.CONNECTIONLOSS);
|
|
|
|
+ }
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ throw new IOException(
|
|
|
|
+ "Interrupted when connecting to zookeeper server", e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private synchronized void setZooKeeperRef(ZooKeeper zk) {
|
|
|
|
+ Preconditions.checkState(this.zk == null,
|
|
|
|
+ "zk already set -- must be set exactly once");
|
|
|
|
+ this.zk = zk;
|
|
|
|
+
|
|
|
|
+ for (WatchedEvent e : queuedEvents) {
|
|
|
|
+ forwardEvent(e);
|
|
|
|
+ }
|
|
|
|
+ queuedEvents.clear();
|
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void process(WatchedEvent event) {
|
|
|
|
|
|
+ public synchronized void process(WatchedEvent event) {
|
|
|
|
+ if (zk != null) {
|
|
|
|
+ forwardEvent(event);
|
|
|
|
+ } else {
|
|
|
|
+ queuedEvents.add(event);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void forwardEvent(WatchedEvent event) {
|
|
|
|
+ hasReceivedEvent.countDown();
|
|
try {
|
|
try {
|
|
ActiveStandbyElector.this.processWatchEvent(
|
|
ActiveStandbyElector.this.processWatchEvent(
|
|
zk, event);
|
|
zk, event);
|
|
@@ -1024,5 +1106,4 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
((appData == null) ? "null" : StringUtils.byteToHexString(appData)) +
|
|
((appData == null) ? "null" : StringUtils.byteToHexString(appData)) +
|
|
" cb=" + appClient;
|
|
" cb=" + appClient;
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|