|
@@ -240,8 +240,6 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
public synchronized void joinElection(byte[] data)
|
|
|
throws HadoopIllegalArgumentException {
|
|
|
|
|
|
- LOG.debug("Attempting active election");
|
|
|
-
|
|
|
if (data == null) {
|
|
|
throw new HadoopIllegalArgumentException("data cannot be null");
|
|
|
}
|
|
@@ -249,6 +247,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
appData = new byte[data.length];
|
|
|
System.arraycopy(data, 0, appData, 0, data.length);
|
|
|
|
|
|
+ LOG.debug("Attempting active election for " + this);
|
|
|
joinElectionInternal();
|
|
|
}
|
|
|
|
|
@@ -272,6 +271,9 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
*/
|
|
|
public synchronized void ensureParentZNode()
|
|
|
throws IOException, InterruptedException {
|
|
|
+ Preconditions.checkState(!wantToBeInElection,
|
|
|
+ "ensureParentZNode() may not be called while in the election");
|
|
|
+
|
|
|
String pathParts[] = znodeWorkingDir.split("/");
|
|
|
Preconditions.checkArgument(pathParts.length >= 1 &&
|
|
|
"".equals(pathParts[0]),
|
|
@@ -305,6 +307,9 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
*/
|
|
|
public synchronized void clearParentZNode()
|
|
|
throws IOException, InterruptedException {
|
|
|
+ Preconditions.checkState(!wantToBeInElection,
|
|
|
+ "clearParentZNode() may not be called while in the election");
|
|
|
+
|
|
|
try {
|
|
|
LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
|
|
|
|
|
@@ -393,7 +398,8 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
String name) {
|
|
|
if (isStaleClient(ctx)) return;
|
|
|
LOG.debug("CreateNode result: " + rc + " for path: " + path
|
|
|
- + " connectionState: " + zkConnectionState);
|
|
|
+ + " connectionState: " + zkConnectionState +
|
|
|
+ " for " + this);
|
|
|
|
|
|
Code code = Code.get(rc);
|
|
|
if (isSuccess(code)) {
|
|
@@ -449,8 +455,13 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
public synchronized void processResult(int rc, String path, Object ctx,
|
|
|
Stat stat) {
|
|
|
if (isStaleClient(ctx)) return;
|
|
|
+
|
|
|
+ assert wantToBeInElection :
|
|
|
+ "Got a StatNode result after quitting election";
|
|
|
+
|
|
|
LOG.debug("StatNode result: " + rc + " for path: " + path
|
|
|
- + " connectionState: " + zkConnectionState);
|
|
|
+ + " connectionState: " + zkConnectionState + " for " + this);
|
|
|
+
|
|
|
|
|
|
Code code = Code.get(rc);
|
|
|
if (isSuccess(code)) {
|
|
@@ -517,7 +528,8 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
if (isStaleClient(zk)) return;
|
|
|
LOG.debug("Watcher event type: " + eventType + " with state:"
|
|
|
+ event.getState() + " for path:" + event.getPath()
|
|
|
- + " connectionState: " + zkConnectionState);
|
|
|
+ + " connectionState: " + zkConnectionState
|
|
|
+ + " for " + this);
|
|
|
|
|
|
if (eventType == Event.EventType.None) {
|
|
|
// the connection state has changed
|
|
@@ -528,7 +540,8 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
// be undone
|
|
|
ConnectionState prevConnectionState = zkConnectionState;
|
|
|
zkConnectionState = ConnectionState.CONNECTED;
|
|
|
- if (prevConnectionState == ConnectionState.DISCONNECTED) {
|
|
|
+ if (prevConnectionState == ConnectionState.DISCONNECTED &&
|
|
|
+ wantToBeInElection) {
|
|
|
monitorActiveStatus();
|
|
|
}
|
|
|
break;
|
|
@@ -600,12 +613,14 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
}
|
|
|
|
|
|
private void fatalError(String errorMessage) {
|
|
|
+ LOG.fatal(errorMessage);
|
|
|
reset();
|
|
|
appClient.notifyFatalError(errorMessage);
|
|
|
}
|
|
|
|
|
|
private void monitorActiveStatus() {
|
|
|
- LOG.debug("Monitoring active leader");
|
|
|
+ assert wantToBeInElection;
|
|
|
+ LOG.debug("Monitoring active leader for " + this);
|
|
|
statRetryCount = 0;
|
|
|
monitorLockNodeAsync();
|
|
|
}
|
|
@@ -688,7 +703,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
int connectionRetryCount = 0;
|
|
|
boolean success = false;
|
|
|
while(!success && connectionRetryCount < NUM_RETRIES) {
|
|
|
- LOG.debug("Establishing zookeeper connection");
|
|
|
+ LOG.debug("Establishing zookeeper connection for " + this);
|
|
|
try {
|
|
|
createConnection();
|
|
|
success = true;
|
|
@@ -703,13 +718,14 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
|
|
|
private void createConnection() throws IOException {
|
|
|
zkClient = getNewZooKeeper();
|
|
|
+ LOG.debug("Created new connection for " + this);
|
|
|
}
|
|
|
|
|
|
private void terminateConnection() {
|
|
|
if (zkClient == null) {
|
|
|
return;
|
|
|
}
|
|
|
- LOG.debug("Terminating ZK connection");
|
|
|
+ LOG.debug("Terminating ZK connection for " + this);
|
|
|
ZooKeeper tempZk = zkClient;
|
|
|
zkClient = null;
|
|
|
try {
|
|
@@ -735,7 +751,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
Stat oldBreadcrumbStat = fenceOldActive();
|
|
|
writeBreadCrumbNode(oldBreadcrumbStat);
|
|
|
|
|
|
- LOG.debug("Becoming active");
|
|
|
+ LOG.debug("Becoming active for " + this);
|
|
|
appClient.becomeActive();
|
|
|
state = State.ACTIVE;
|
|
|
return true;
|
|
@@ -838,7 +854,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
|
|
|
private void becomeStandby() {
|
|
|
if (state != State.STANDBY) {
|
|
|
- LOG.debug("Becoming standby");
|
|
|
+ LOG.debug("Becoming standby for " + this);
|
|
|
state = State.STANDBY;
|
|
|
appClient.becomeStandby();
|
|
|
}
|
|
@@ -846,7 +862,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
|
|
|
private void enterNeutralMode() {
|
|
|
if (state != State.NEUTRAL) {
|
|
|
- LOG.debug("Entering neutral mode");
|
|
|
+ LOG.debug("Entering neutral mode for " + this);
|
|
|
state = State.NEUTRAL;
|
|
|
appClient.enterNeutralMode();
|
|
|
}
|
|
@@ -943,8 +959,14 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
|
|
|
@Override
|
|
|
public void process(WatchedEvent event) {
|
|
|
- ActiveStandbyElector.this.processWatchEvent(
|
|
|
- zk, event);
|
|
|
+ try {
|
|
|
+ ActiveStandbyElector.this.processWatchEvent(
|
|
|
+ zk, event);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ fatalError(
|
|
|
+ "Failed to process watcher event " + event + ": " +
|
|
|
+ StringUtils.stringifyException(t));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -972,5 +994,13 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "elector id=" + System.identityHashCode(this) +
|
|
|
+ " appData=" +
|
|
|
+ ((appData == null) ? "null" : StringUtils.byteToHexString(appData)) +
|
|
|
+ " cb=" + appClient;
|
|
|
+ }
|
|
|
|
|
|
}
|