|
@@ -180,27 +180,36 @@ public class LeaderElectionSupport implements Watcher {
|
|
state = State.OFFER;
|
|
state = State.OFFER;
|
|
dispatchEvent(EventType.OFFER_START);
|
|
dispatchEvent(EventType.OFFER_START);
|
|
|
|
|
|
- leaderOffer = new LeaderOffer();
|
|
|
|
-
|
|
|
|
- leaderOffer.setHostName(hostName);
|
|
|
|
- leaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
|
|
|
|
- hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
|
|
|
|
|
+ LeaderOffer newLeaderOffer = new LeaderOffer();
|
|
|
|
+ byte[] hostnameBytes;
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ newLeaderOffer.setHostName(hostName);
|
|
|
|
+ hostnameBytes = hostName.getBytes();
|
|
|
|
+ newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
|
|
|
|
+ hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
|
CreateMode.EPHEMERAL_SEQUENTIAL));
|
|
CreateMode.EPHEMERAL_SEQUENTIAL));
|
|
-
|
|
|
|
|
|
+ leaderOffer = newLeaderOffer;
|
|
|
|
+ }
|
|
logger.debug("Created leader offer {}", leaderOffer);
|
|
logger.debug("Created leader offer {}", leaderOffer);
|
|
|
|
|
|
dispatchEvent(EventType.OFFER_COMPLETE);
|
|
dispatchEvent(EventType.OFFER_COMPLETE);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private synchronized LeaderOffer getLeaderOffer() {
|
|
|
|
+ return leaderOffer;
|
|
|
|
+ }
|
|
|
|
+
|
|
private void determineElectionStatus() throws KeeperException,
|
|
private void determineElectionStatus() throws KeeperException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
|
|
|
|
state = State.DETERMINE;
|
|
state = State.DETERMINE;
|
|
dispatchEvent(EventType.DETERMINE_START);
|
|
dispatchEvent(EventType.DETERMINE_START);
|
|
|
|
|
|
- String[] components = leaderOffer.getNodePath().split("/");
|
|
|
|
|
|
+ LeaderOffer currentLeaderOffer = getLeaderOffer();
|
|
|
|
+
|
|
|
|
+ String[] components = currentLeaderOffer.getNodePath().split("/");
|
|
|
|
|
|
- leaderOffer.setId(Integer.valueOf(components[components.length - 1]
|
|
|
|
|
|
+ currentLeaderOffer.setId(Integer.valueOf(components[components.length - 1]
|
|
.substring("n_".length())));
|
|
.substring("n_".length())));
|
|
|
|
|
|
List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(
|
|
List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(
|
|
@@ -215,7 +224,7 @@ public class LeaderElectionSupport implements Watcher {
|
|
for (int i = 0; i < leaderOffers.size(); i++) {
|
|
for (int i = 0; i < leaderOffers.size(); i++) {
|
|
LeaderOffer leaderOffer = leaderOffers.get(i);
|
|
LeaderOffer leaderOffer = leaderOffers.get(i);
|
|
|
|
|
|
- if (leaderOffer.getId().equals(this.leaderOffer.getId())) {
|
|
|
|
|
|
+ if (leaderOffer.getId().equals(currentLeaderOffer.getId())) {
|
|
logger.debug("There are {} leader offers. I am {} in line.",
|
|
logger.debug("There are {} leader offers. I am {} in line.",
|
|
leaderOffers.size(), i);
|
|
leaderOffers.size(), i);
|
|
|
|
|
|
@@ -237,7 +246,7 @@ public class LeaderElectionSupport implements Watcher {
|
|
throws KeeperException, InterruptedException {
|
|
throws KeeperException, InterruptedException {
|
|
|
|
|
|
logger.info("{} not elected leader. Watching node:{}",
|
|
logger.info("{} not elected leader. Watching node:{}",
|
|
- leaderOffer.getNodePath(), neighborLeaderOffer.getNodePath());
|
|
|
|
|
|
+ getLeaderOffer().getNodePath(), neighborLeaderOffer.getNodePath());
|
|
|
|
|
|
/*
|
|
/*
|
|
* Make sure to pass an explicit Watcher because we could be sharing this
|
|
* Make sure to pass an explicit Watcher because we could be sharing this
|
|
@@ -270,7 +279,7 @@ public class LeaderElectionSupport implements Watcher {
|
|
state = State.ELECTED;
|
|
state = State.ELECTED;
|
|
dispatchEvent(EventType.ELECTED_START);
|
|
dispatchEvent(EventType.ELECTED_START);
|
|
|
|
|
|
- logger.info("Becoming leader with node:{}", leaderOffer.getNodePath());
|
|
|
|
|
|
+ logger.info("Becoming leader with node:{}", getLeaderOffer().getNodePath());
|
|
|
|
|
|
dispatchEvent(EventType.ELECTED_COMPLETE);
|
|
dispatchEvent(EventType.ELECTED_COMPLETE);
|
|
}
|
|
}
|
|
@@ -336,7 +345,7 @@ public class LeaderElectionSupport implements Watcher {
|
|
@Override
|
|
@Override
|
|
public void process(WatchedEvent event) {
|
|
public void process(WatchedEvent event) {
|
|
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
|
|
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
|
|
- if (!event.getPath().equals(leaderOffer.getNodePath())
|
|
|
|
|
|
+ if (!event.getPath().equals(getLeaderOffer().getNodePath())
|
|
&& state != State.STOP) {
|
|
&& state != State.STOP) {
|
|
logger.debug(
|
|
logger.debug(
|
|
"Node {} deleted. Need to run through the election process.",
|
|
"Node {} deleted. Need to run through the election process.",
|
|
@@ -384,8 +393,8 @@ public class LeaderElectionSupport implements Watcher {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public String toString() {
|
|
public String toString() {
|
|
- return "{ state:" + state + " leaderOffer:" + leaderOffer + " zooKeeper:"
|
|
|
|
- + zooKeeper + " hostName:" + hostName + " listeners:" + listeners
|
|
|
|
|
|
+ return "{ state:" + state + " leaderOffer:" + getLeaderOffer() + " zooKeeper:"
|
|
|
|
+ + zooKeeper + " hostName:" + getHostName() + " listeners:" + listeners
|
|
+ " }";
|
|
+ " }";
|
|
}
|
|
}
|
|
|
|
|
|
@@ -437,11 +446,11 @@ public class LeaderElectionSupport implements Watcher {
|
|
* The hostname of this process. Mostly used as a convenience for logging and
|
|
* The hostname of this process. Mostly used as a convenience for logging and
|
|
* to respond to {@link #getLeaderHostName()} requests.
|
|
* to respond to {@link #getLeaderHostName()} requests.
|
|
*/
|
|
*/
|
|
- public String getHostName() {
|
|
|
|
|
|
+ public synchronized String getHostName() {
|
|
return hostName;
|
|
return hostName;
|
|
}
|
|
}
|
|
|
|
|
|
- public void setHostName(String hostName) {
|
|
|
|
|
|
+ public synchronized void setHostName(String hostName) {
|
|
this.hostName = hostName;
|
|
this.hostName = hostName;
|
|
}
|
|
}
|
|
|
|
|