|
@@ -14,6 +14,7 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
+
|
|
|
package org.apache.zookeeper.recipes.leader;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
@@ -21,7 +22,6 @@ import java.util.Collections;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
-
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.WatchedEvent;
|
|
@@ -33,11 +33,9 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
|
- * <p>
|
|
|
* A leader election support library implementing the ZooKeeper election recipe.
|
|
|
- * </p>
|
|
|
- * <p>
|
|
|
- * This support library is meant to simplify the construction of an exclusive
|
|
|
+ *
|
|
|
+ * <p>This support library is meant to simplify the construction of an exclusive
|
|
|
* leader system on top of Apache ZooKeeper. Any application that can become the
|
|
|
* leader (usually a process that provides a service, exclusively) would
|
|
|
* configure an instance of this class with their hostname, at least one
|
|
@@ -47,12 +45,10 @@ import org.slf4j.LoggerFactory;
|
|
|
* ZooKeeper and create a leader offer. The library then determines if it has
|
|
|
* been elected the leader using the algorithm described below. The client
|
|
|
* application can follow all state transitions via the listener callback.
|
|
|
- * </p>
|
|
|
- * <p>
|
|
|
- * Leader election algorithm
|
|
|
- * </p>
|
|
|
- * <p>
|
|
|
- * The library starts in a START state. Through each state transition, a state
|
|
|
+ *
|
|
|
+ * <p>Leader election algorithm
|
|
|
+ *
|
|
|
+ * <p>The library starts in a START state. Through each state transition, a state
|
|
|
* start and a state complete event are sent to all listeners. When
|
|
|
* {@link #start()} is called, a leader offer is created in ZooKeeper. A leader
|
|
|
* offer is an ephemeral sequential node that indicates a process that can act
|
|
@@ -66,12 +62,10 @@ import org.slf4j.LoggerFactory;
|
|
|
* process again to see if it should become the leader. Note that sequence ID
|
|
|
* may not be contiguous due to failed processes. A process may revoke its offer
|
|
|
* to be the leader at any time by calling {@link #stop()}.
|
|
|
- * </p>
|
|
|
- * <p>
|
|
|
- * Guarantees (not) Made and Caveats
|
|
|
- * </p>
|
|
|
- * <p>
|
|
|
- * <ul>
|
|
|
+ *
|
|
|
+ * <p>Guarantees (not) Made and Caveats
|
|
|
+ *
|
|
|
+ * <p><ul>
|
|
|
* <li>It is possible for a (poorly implemented) process to create a leader
|
|
|
* offer, get the lowest sequence ID, but have something terrible occur where it
|
|
|
* maintains its connection to ZK (and thus its ephemeral leader offer node) but
|
|
@@ -94,376 +88,384 @@ import org.slf4j.LoggerFactory;
|
|
|
*/
|
|
|
public class LeaderElectionSupport implements Watcher {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory
|
|
|
- .getLogger(LeaderElectionSupport.class);
|
|
|
-
|
|
|
- private ZooKeeper zooKeeper;
|
|
|
-
|
|
|
- private State state;
|
|
|
- private Set<LeaderElectionAware> listeners;
|
|
|
-
|
|
|
- private String rootNodeName;
|
|
|
- private LeaderOffer leaderOffer;
|
|
|
- private String hostName;
|
|
|
-
|
|
|
- public LeaderElectionSupport() {
|
|
|
- state = State.STOP;
|
|
|
- listeners = Collections.synchronizedSet(new HashSet<LeaderElectionAware>());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * <p>
|
|
|
- * Start the election process. This method will create a leader offer,
|
|
|
- * determine its status, and either become the leader or become ready. If an
|
|
|
- * instance of {@link ZooKeeper} has not yet been configured by the user, a
|
|
|
- * new instance is created using the connectString and sessionTime specified.
|
|
|
- * </p>
|
|
|
- * <p>
|
|
|
- * Any (anticipated) failures result in a failed event being sent to all
|
|
|
- * listeners.
|
|
|
- * </p>
|
|
|
- */
|
|
|
- public synchronized void start() {
|
|
|
- state = State.START;
|
|
|
- dispatchEvent(EventType.START);
|
|
|
-
|
|
|
- logger.info("Starting leader election support");
|
|
|
-
|
|
|
- if (zooKeeper == null) {
|
|
|
- throw new IllegalStateException(
|
|
|
- "No instance of zookeeper provided. Hint: use setZooKeeper()");
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionSupport.class);
|
|
|
+
|
|
|
+ private ZooKeeper zooKeeper;
|
|
|
+
|
|
|
+ private State state;
|
|
|
+ private Set<LeaderElectionAware> listeners;
|
|
|
+
|
|
|
+ private String rootNodeName;
|
|
|
+ private LeaderOffer leaderOffer;
|
|
|
+ private String hostName;
|
|
|
+
|
|
|
+ public LeaderElectionSupport() {
|
|
|
+ state = State.STOP;
|
|
|
+ listeners = Collections.synchronizedSet(new HashSet<>());
|
|
|
}
|
|
|
|
|
|
- if (hostName == null) {
|
|
|
- throw new IllegalStateException(
|
|
|
- "No hostname provided. Hint: use setHostName()");
|
|
|
+ /**
|
|
|
+ * <p>
|
|
|
+ * Start the election process. This method will create a leader offer,
|
|
|
+ * determine its status, and either become the leader or become ready. If an
|
|
|
+ * instance of {@link ZooKeeper} has not yet been configured by the user, a
|
|
|
+ * new instance is created using the connectString and sessionTime specified.
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * Any (anticipated) failures result in a failed event being sent to all
|
|
|
+ * listeners.
|
|
|
+ * </p>
|
|
|
+ */
|
|
|
+ public synchronized void start() {
|
|
|
+ state = State.START;
|
|
|
+ dispatchEvent(EventType.START);
|
|
|
+
|
|
|
+ LOG.info("Starting leader election support");
|
|
|
+
|
|
|
+ if (zooKeeper == null) {
|
|
|
+ throw new IllegalStateException(
|
|
|
+ "No instance of zookeeper provided. Hint: use setZooKeeper()");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (hostName == null) {
|
|
|
+ throw new IllegalStateException(
|
|
|
+ "No hostname provided. Hint: use setHostName()");
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ makeOffer();
|
|
|
+ determineElectionStatus();
|
|
|
+ } catch (KeeperException | InterruptedException e) {
|
|
|
+ becomeFailed(e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- try {
|
|
|
- makeOffer();
|
|
|
- determineElectionStatus();
|
|
|
- } catch (KeeperException e) {
|
|
|
- becomeFailed(e);
|
|
|
- return;
|
|
|
- } catch (InterruptedException e) {
|
|
|
- becomeFailed(e);
|
|
|
- return;
|
|
|
+ /**
|
|
|
+ * Stops all election services, revokes any outstanding leader offers, and
|
|
|
+ * disconnects from ZooKeeper.
|
|
|
+ */
|
|
|
+ public synchronized void stop() {
|
|
|
+ state = State.STOP;
|
|
|
+ dispatchEvent(EventType.STOP_START);
|
|
|
+
|
|
|
+ LOG.info("Stopping leader election support");
|
|
|
+
|
|
|
+ if (leaderOffer != null) {
|
|
|
+ try {
|
|
|
+ zooKeeper.delete(leaderOffer.getNodePath(), -1);
|
|
|
+ LOG.info("Removed leader offer {}", leaderOffer.getNodePath());
|
|
|
+ } catch (InterruptedException | KeeperException e) {
|
|
|
+ becomeFailed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ dispatchEvent(EventType.STOP_COMPLETE);
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Stops all election services, revokes any outstanding leader offers, and
|
|
|
- * disconnects from ZooKeeper.
|
|
|
- */
|
|
|
- public synchronized void stop() {
|
|
|
- state = State.STOP;
|
|
|
- dispatchEvent(EventType.STOP_START);
|
|
|
-
|
|
|
- logger.info("Stopping leader election support");
|
|
|
-
|
|
|
- if (leaderOffer != null) {
|
|
|
- try {
|
|
|
- zooKeeper.delete(leaderOffer.getNodePath(), -1);
|
|
|
- logger.info("Removed leader offer {}", leaderOffer.getNodePath());
|
|
|
- } catch (InterruptedException e) {
|
|
|
- becomeFailed(e);
|
|
|
- } catch (KeeperException e) {
|
|
|
- becomeFailed(e);
|
|
|
- }
|
|
|
+
|
|
|
+ private void makeOffer() throws KeeperException, InterruptedException {
|
|
|
+ state = State.OFFER;
|
|
|
+ dispatchEvent(EventType.OFFER_START);
|
|
|
+
|
|
|
+ LeaderOffer newLeaderOffer = new LeaderOffer();
|
|
|
+ byte[] hostnameBytes;
|
|
|
+ synchronized (this) {
|
|
|
+ newLeaderOffer.setHostName(hostName);
|
|
|
+ hostnameBytes = hostName.getBytes();
|
|
|
+ newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
|
|
|
+ hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.EPHEMERAL_SEQUENTIAL));
|
|
|
+ leaderOffer = newLeaderOffer;
|
|
|
+ }
|
|
|
+ LOG.debug("Created leader offer {}", leaderOffer);
|
|
|
+
|
|
|
+ dispatchEvent(EventType.OFFER_COMPLETE);
|
|
|
}
|
|
|
|
|
|
- dispatchEvent(EventType.STOP_COMPLETE);
|
|
|
- }
|
|
|
-
|
|
|
- private void makeOffer() throws KeeperException, InterruptedException {
|
|
|
- state = State.OFFER;
|
|
|
- dispatchEvent(EventType.OFFER_START);
|
|
|
-
|
|
|
- LeaderOffer newLeaderOffer = new LeaderOffer();
|
|
|
- byte[] hostnameBytes;
|
|
|
- synchronized (this) {
|
|
|
- newLeaderOffer.setHostName(hostName);
|
|
|
- hostnameBytes = hostName.getBytes();
|
|
|
- newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
|
|
|
- hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
|
|
- CreateMode.EPHEMERAL_SEQUENTIAL));
|
|
|
- leaderOffer = newLeaderOffer;
|
|
|
+ private synchronized LeaderOffer getLeaderOffer() {
|
|
|
+ return leaderOffer;
|
|
|
}
|
|
|
- logger.debug("Created leader offer {}", leaderOffer);
|
|
|
|
|
|
- dispatchEvent(EventType.OFFER_COMPLETE);
|
|
|
- }
|
|
|
+ private void determineElectionStatus() throws KeeperException, InterruptedException {
|
|
|
|
|
|
- private synchronized LeaderOffer getLeaderOffer() {
|
|
|
- return leaderOffer;
|
|
|
- }
|
|
|
+ state = State.DETERMINE;
|
|
|
+ dispatchEvent(EventType.DETERMINE_START);
|
|
|
|
|
|
- private void determineElectionStatus() throws KeeperException,
|
|
|
- InterruptedException {
|
|
|
+ LeaderOffer currentLeaderOffer = getLeaderOffer();
|
|
|
|
|
|
- state = State.DETERMINE;
|
|
|
- dispatchEvent(EventType.DETERMINE_START);
|
|
|
+ String[] components = currentLeaderOffer.getNodePath().split("/");
|
|
|
|
|
|
- LeaderOffer currentLeaderOffer = getLeaderOffer();
|
|
|
+ currentLeaderOffer.setId(Integer.valueOf(components[components.length - 1].substring("n_".length())));
|
|
|
|
|
|
- String[] components = currentLeaderOffer.getNodePath().split("/");
|
|
|
+ List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(rootNodeName, false));
|
|
|
|
|
|
- currentLeaderOffer.setId(Integer.valueOf(components[components.length - 1]
|
|
|
- .substring("n_".length())));
|
|
|
+ /*
|
|
|
+ * For each leader offer, find out where we fit in. If we're first, we
|
|
|
+ * become the leader. If we're not elected the leader, attempt to stat the
|
|
|
+ * offer just less than us. If they exist, watch for their failure, but if
|
|
|
+ * they don't, become the leader.
|
|
|
+ */
|
|
|
+ for (int i = 0; i < leaderOffers.size(); i++) {
|
|
|
+ LeaderOffer leaderOffer = leaderOffers.get(i);
|
|
|
|
|
|
- List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(
|
|
|
- rootNodeName, false));
|
|
|
+ if (leaderOffer.getId().equals(currentLeaderOffer.getId())) {
|
|
|
+ LOG.debug("There are {} leader offers. I am {} in line.", leaderOffers.size(), i);
|
|
|
|
|
|
- /*
|
|
|
- * For each leader offer, find out where we fit in. If we're first, we
|
|
|
- * become the leader. If we're not elected the leader, attempt to stat the
|
|
|
- * offer just less than us. If they exist, watch for their failure, but if
|
|
|
- * they don't, become the leader.
|
|
|
- */
|
|
|
- for (int i = 0; i < leaderOffers.size(); i++) {
|
|
|
- LeaderOffer leaderOffer = leaderOffers.get(i);
|
|
|
+ dispatchEvent(EventType.DETERMINE_COMPLETE);
|
|
|
|
|
|
- if (leaderOffer.getId().equals(currentLeaderOffer.getId())) {
|
|
|
- logger.debug("There are {} leader offers. I am {} in line.",
|
|
|
- leaderOffers.size(), i);
|
|
|
+ if (i == 0) {
|
|
|
+ becomeLeader();
|
|
|
+ } else {
|
|
|
+ becomeReady(leaderOffers.get(i - 1));
|
|
|
+ }
|
|
|
|
|
|
- dispatchEvent(EventType.DETERMINE_COMPLETE);
|
|
|
+ /* Once we've figured out where we are, we're done. */
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if (i == 0) {
|
|
|
- becomeLeader();
|
|
|
+ private void becomeReady(LeaderOffer neighborLeaderOffer)
|
|
|
+ throws KeeperException, InterruptedException {
|
|
|
+
|
|
|
+ LOG.info(
|
|
|
+ "{} not elected leader. Watching node:{}",
|
|
|
+ getLeaderOffer().getNodePath(),
|
|
|
+ neighborLeaderOffer.getNodePath());
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Make sure to pass an explicit Watcher because we could be sharing this
|
|
|
+ * zooKeeper instance with someone else.
|
|
|
+ */
|
|
|
+ Stat stat = zooKeeper.exists(neighborLeaderOffer.getNodePath(), this);
|
|
|
+
|
|
|
+ if (stat != null) {
|
|
|
+ dispatchEvent(EventType.READY_START);
|
|
|
+ LOG.debug(
|
|
|
+ "We're behind {} in line and they're alive. Keeping an eye on them.",
|
|
|
+ neighborLeaderOffer.getNodePath());
|
|
|
+ state = State.READY;
|
|
|
+ dispatchEvent(EventType.READY_COMPLETE);
|
|
|
} else {
|
|
|
- becomeReady(leaderOffers.get(i - 1));
|
|
|
+ /*
|
|
|
+ * If the stat fails, the node has gone missing between the call to
|
|
|
+ * getChildren() and exists(). We need to try and become the leader.
|
|
|
+ */
|
|
|
+ LOG.info(
|
|
|
+ "We were behind {} but it looks like they died. Back to determination.",
|
|
|
+ neighborLeaderOffer.getNodePath());
|
|
|
+ determineElectionStatus();
|
|
|
}
|
|
|
|
|
|
- /* Once we've figured out where we are, we're done. */
|
|
|
- break;
|
|
|
- }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- private void becomeReady(LeaderOffer neighborLeaderOffer)
|
|
|
- throws KeeperException, InterruptedException {
|
|
|
+ private void becomeLeader() {
|
|
|
+ state = State.ELECTED;
|
|
|
+ dispatchEvent(EventType.ELECTED_START);
|
|
|
+
|
|
|
+ LOG.info("Becoming leader with node:{}", getLeaderOffer().getNodePath());
|
|
|
+
|
|
|
+ dispatchEvent(EventType.ELECTED_COMPLETE);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void becomeFailed(Exception e) {
|
|
|
+ LOG.error("Failed in state {} - Exception:{}", state, e);
|
|
|
|
|
|
- logger.info("{} not elected leader. Watching node:{}",
|
|
|
- getLeaderOffer().getNodePath(), neighborLeaderOffer.getNodePath());
|
|
|
+ state = State.FAILED;
|
|
|
+ dispatchEvent(EventType.FAILED);
|
|
|
+ }
|
|
|
|
|
|
- /*
|
|
|
- * Make sure to pass an explicit Watcher because we could be sharing this
|
|
|
- * zooKeeper instance with someone else.
|
|
|
+ /**
|
|
|
+ * Fetch the (user supplied) hostname of the current leader. Note that by the
|
|
|
+ * time this method returns, state could have changed so do not depend on this
|
|
|
+ * to be strongly consistent. This method has to read all leader offers from
|
|
|
+ * ZooKeeper to deterime who the leader is (i.e. there is no caching) so
|
|
|
+ * consider the performance implications of frequent invocation. If there are
|
|
|
+ * no leader offers this method returns null.
|
|
|
+ *
|
|
|
+ * @return hostname of the current leader
|
|
|
+ * @throws KeeperException
|
|
|
+ * @throws InterruptedException
|
|
|
*/
|
|
|
- Stat stat = zooKeeper.exists(neighborLeaderOffer.getNodePath(), this);
|
|
|
-
|
|
|
- if (stat != null) {
|
|
|
- dispatchEvent(EventType.READY_START);
|
|
|
- logger.debug(
|
|
|
- "We're behind {} in line and they're alive. Keeping an eye on them.",
|
|
|
- neighborLeaderOffer.getNodePath());
|
|
|
- state = State.READY;
|
|
|
- dispatchEvent(EventType.READY_COMPLETE);
|
|
|
- } else {
|
|
|
- /*
|
|
|
- * If the stat fails, the node has gone missing between the call to
|
|
|
- * getChildren() and exists(). We need to try and become the leader.
|
|
|
- */
|
|
|
- logger
|
|
|
- .info(
|
|
|
- "We were behind {} but it looks like they died. Back to determination.",
|
|
|
- neighborLeaderOffer.getNodePath());
|
|
|
- determineElectionStatus();
|
|
|
+ public String getLeaderHostName() throws KeeperException, InterruptedException {
|
|
|
+
|
|
|
+ List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(rootNodeName, false));
|
|
|
+
|
|
|
+ if (leaderOffers.size() > 0) {
|
|
|
+ return leaderOffers.get(0).getHostName();
|
|
|
+ }
|
|
|
+
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
- }
|
|
|
+ private List<LeaderOffer> toLeaderOffers(List<String> strings)
|
|
|
+ throws KeeperException, InterruptedException {
|
|
|
+
|
|
|
+ List<LeaderOffer> leaderOffers = new ArrayList<>(strings.size());
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Turn each child of rootNodeName into a leader offer. This is a tuple of
|
|
|
+ * the sequence number and the node name.
|
|
|
+ */
|
|
|
+ for (String offer : strings) {
|
|
|
+ String hostName = new String(zooKeeper.getData(rootNodeName + "/" + offer, false, null));
|
|
|
|
|
|
- private void becomeLeader() {
|
|
|
- state = State.ELECTED;
|
|
|
- dispatchEvent(EventType.ELECTED_START);
|
|
|
+ leaderOffers.add(new LeaderOffer(
|
|
|
+ Integer.valueOf(offer.substring("n_".length())),
|
|
|
+ rootNodeName + "/" + offer, hostName));
|
|
|
+ }
|
|
|
|
|
|
- logger.info("Becoming leader with node:{}", getLeaderOffer().getNodePath());
|
|
|
+ /*
|
|
|
+ * We sort leader offers by sequence number (which may not be zero-based or
|
|
|
+ * contiguous) and keep their paths handy for setting watches.
|
|
|
+ */
|
|
|
+ Collections.sort(leaderOffers, new LeaderOffer.IdComparator());
|
|
|
|
|
|
- dispatchEvent(EventType.ELECTED_COMPLETE);
|
|
|
- }
|
|
|
+ return leaderOffers;
|
|
|
+ }
|
|
|
|
|
|
- private void becomeFailed(Exception e) {
|
|
|
- logger.error("Failed in state {} - Exception:{}", state, e);
|
|
|
+ @Override
|
|
|
+ public void process(WatchedEvent event) {
|
|
|
+ if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
|
|
|
+ if (!event.getPath().equals(getLeaderOffer().getNodePath())
|
|
|
+ && state != State.STOP) {
|
|
|
+ LOG.debug(
|
|
|
+ "Node {} deleted. Need to run through the election process.",
|
|
|
+ event.getPath());
|
|
|
+ try {
|
|
|
+ determineElectionStatus();
|
|
|
+ } catch (KeeperException | InterruptedException e) {
|
|
|
+ becomeFailed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- state = State.FAILED;
|
|
|
- dispatchEvent(EventType.FAILED);
|
|
|
- }
|
|
|
+ private void dispatchEvent(EventType eventType) {
|
|
|
+ LOG.debug("Dispatching event:{}", eventType);
|
|
|
|
|
|
- /**
|
|
|
- * Fetch the (user supplied) hostname of the current leader. Note that by the
|
|
|
- * time this method returns, state could have changed so do not depend on this
|
|
|
- * to be strongly consistent. This method has to read all leader offers from
|
|
|
- * ZooKeeper to deterime who the leader is (i.e. there is no caching) so
|
|
|
- * consider the performance implications of frequent invocation. If there are
|
|
|
- * no leader offers this method returns null.
|
|
|
- *
|
|
|
- * @return hostname of the current leader
|
|
|
- * @throws KeeperException
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- public String getLeaderHostName() throws KeeperException,
|
|
|
- InterruptedException {
|
|
|
+ synchronized (listeners) {
|
|
|
+ if (listeners.size() > 0) {
|
|
|
+ for (LeaderElectionAware observer : listeners) {
|
|
|
+ observer.onElectionEvent(eventType);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(
|
|
|
- rootNodeName, false));
|
|
|
+ /**
|
|
|
+ * Adds {@code listener} to the list of listeners who will receive events.
|
|
|
+ *
|
|
|
+ * @param listener
|
|
|
+ */
|
|
|
+ public void addListener(LeaderElectionAware listener) {
|
|
|
+ listeners.add(listener);
|
|
|
+ }
|
|
|
|
|
|
- if (leaderOffers.size() > 0) {
|
|
|
- return leaderOffers.get(0).getHostName();
|
|
|
+ /**
|
|
|
+ * Remove {@code listener} from the list of listeners who receive events.
|
|
|
+ *
|
|
|
+ * @param listener
|
|
|
+ */
|
|
|
+ public void removeListener(LeaderElectionAware listener) {
|
|
|
+ listeners.remove(listener);
|
|
|
}
|
|
|
|
|
|
- return null;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "{"
|
|
|
+ + " state:" + state
|
|
|
+ + " leaderOffer:" + getLeaderOffer()
|
|
|
+ + " zooKeeper:" + zooKeeper
|
|
|
+ + " hostName:" + getHostName()
|
|
|
+ + " listeners:" + listeners
|
|
|
+ + " }";
|
|
|
+ }
|
|
|
|
|
|
- private List<LeaderOffer> toLeaderOffers(List<String> strings)
|
|
|
- throws KeeperException, InterruptedException {
|
|
|
+ /**
|
|
|
+ * <p>
|
|
|
+ * Gets the ZooKeeper root node to use for this service.
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * For instance, a root node of {@code /mycompany/myservice} would be the
|
|
|
+ * parent of all leader offers for this service. Obviously all processes that
|
|
|
+ * wish to contend for leader status need to use the same root node. Note: We
|
|
|
+ * assume this node already exists.
|
|
|
+ * </p>
|
|
|
+ *
|
|
|
+ * @return a znode path
|
|
|
+ */
|
|
|
+ public String getRootNodeName() {
|
|
|
+ return rootNodeName;
|
|
|
+ }
|
|
|
|
|
|
- List<LeaderOffer> leaderOffers = new ArrayList<LeaderOffer>(strings.size());
|
|
|
+ /**
|
|
|
+ * <p>
|
|
|
+ * Sets the ZooKeeper root node to use for this service.
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * For instance, a root node of {@code /mycompany/myservice} would be the
|
|
|
+ * parent of all leader offers for this service. Obviously all processes that
|
|
|
+ * wish to contend for leader status need to use the same root node. Note: We
|
|
|
+ * assume this node already exists.
|
|
|
+ * </p>
|
|
|
+ */
|
|
|
+ public void setRootNodeName(String rootNodeName) {
|
|
|
+ this.rootNodeName = rootNodeName;
|
|
|
+ }
|
|
|
|
|
|
- /*
|
|
|
- * Turn each child of rootNodeName into a leader offer. This is a tuple of
|
|
|
- * the sequence number and the node name.
|
|
|
+ /**
|
|
|
+ * The {@link ZooKeeper} instance to use for all operations. Provided this
|
|
|
+ * overrides any connectString or sessionTimeout set.
|
|
|
*/
|
|
|
- for (String offer : strings) {
|
|
|
- String hostName = new String(zooKeeper.getData(
|
|
|
- rootNodeName + "/" + offer, false, null));
|
|
|
+ public ZooKeeper getZooKeeper() {
|
|
|
+ return zooKeeper;
|
|
|
+ }
|
|
|
|
|
|
- leaderOffers.add(new LeaderOffer(Integer.valueOf(offer.substring("n_"
|
|
|
- .length())), rootNodeName + "/" + offer, hostName));
|
|
|
+ public void setZooKeeper(ZooKeeper zooKeeper) {
|
|
|
+ this.zooKeeper = zooKeeper;
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * We sort leader offers by sequence number (which may not be zero-based or
|
|
|
- * contiguous) and keep their paths handy for setting watches.
|
|
|
+ /**
|
|
|
+ * The hostname of this process. Mostly used as a convenience for logging and
|
|
|
+ * to respond to {@link #getLeaderHostName()} requests.
|
|
|
*/
|
|
|
- Collections.sort(leaderOffers, new LeaderOffer.IdComparator());
|
|
|
-
|
|
|
- return leaderOffers;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void process(WatchedEvent event) {
|
|
|
- if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
|
|
|
- if (!event.getPath().equals(getLeaderOffer().getNodePath())
|
|
|
- && state != State.STOP) {
|
|
|
- logger.debug(
|
|
|
- "Node {} deleted. Need to run through the election process.",
|
|
|
- event.getPath());
|
|
|
- try {
|
|
|
- determineElectionStatus();
|
|
|
- } catch (KeeperException e) {
|
|
|
- becomeFailed(e);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- becomeFailed(e);
|
|
|
- }
|
|
|
- }
|
|
|
+ public synchronized String getHostName() {
|
|
|
+ return hostName;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- private void dispatchEvent(EventType eventType) {
|
|
|
- logger.debug("Dispatching event:{}", eventType);
|
|
|
+ public synchronized void setHostName(String hostName) {
|
|
|
+ this.hostName = hostName;
|
|
|
+ }
|
|
|
|
|
|
- synchronized (listeners) {
|
|
|
- if (listeners.size() > 0) {
|
|
|
- for (LeaderElectionAware observer : listeners) {
|
|
|
- observer.onElectionEvent(eventType);
|
|
|
- }
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * The type of event.
|
|
|
+ */
|
|
|
+ public enum EventType {
|
|
|
+ START,
|
|
|
+ OFFER_START,
|
|
|
+ OFFER_COMPLETE,
|
|
|
+ DETERMINE_START,
|
|
|
+ DETERMINE_COMPLETE,
|
|
|
+ ELECTED_START,
|
|
|
+ ELECTED_COMPLETE,
|
|
|
+ READY_START,
|
|
|
+ READY_COMPLETE,
|
|
|
+ FAILED,
|
|
|
+ STOP_START,
|
|
|
+ STOP_COMPLETE,
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Adds {@code listener} to the list of listeners who will receive events.
|
|
|
- *
|
|
|
- * @param listener
|
|
|
- */
|
|
|
- public void addListener(LeaderElectionAware listener) {
|
|
|
- listeners.add(listener);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Remove {@code listener} from the list of listeners who receive events.
|
|
|
- *
|
|
|
- * @param listener
|
|
|
- */
|
|
|
- public void removeListener(LeaderElectionAware listener) {
|
|
|
- listeners.remove(listener);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- return "{ state:" + state + " leaderOffer:" + getLeaderOffer() + " zooKeeper:"
|
|
|
- + zooKeeper + " hostName:" + getHostName() + " listeners:" + listeners
|
|
|
- + " }";
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * <p>
|
|
|
- * Gets the ZooKeeper root node to use for this service.
|
|
|
- * </p>
|
|
|
- * <p>
|
|
|
- * For instance, a root node of {@code /mycompany/myservice} would be the
|
|
|
- * parent of all leader offers for this service. Obviously all processes that
|
|
|
- * wish to contend for leader status need to use the same root node. Note: We
|
|
|
- * assume this node already exists.
|
|
|
- * </p>
|
|
|
- *
|
|
|
- * @return a znode path
|
|
|
- */
|
|
|
- public String getRootNodeName() {
|
|
|
- return rootNodeName;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * <p>
|
|
|
- * Sets the ZooKeeper root node to use for this service.
|
|
|
- * </p>
|
|
|
- * <p>
|
|
|
- * For instance, a root node of {@code /mycompany/myservice} would be the
|
|
|
- * parent of all leader offers for this service. Obviously all processes that
|
|
|
- * wish to contend for leader status need to use the same root node. Note: We
|
|
|
- * assume this node already exists.
|
|
|
- * </p>
|
|
|
- */
|
|
|
- public void setRootNodeName(String rootNodeName) {
|
|
|
- this.rootNodeName = rootNodeName;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * The {@link ZooKeeper} instance to use for all operations. Provided this
|
|
|
- * overrides any connectString or sessionTimeout set.
|
|
|
- */
|
|
|
- public ZooKeeper getZooKeeper() {
|
|
|
- return zooKeeper;
|
|
|
- }
|
|
|
-
|
|
|
- public void setZooKeeper(ZooKeeper zooKeeper) {
|
|
|
- this.zooKeeper = zooKeeper;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * The hostname of this process. Mostly used as a convenience for logging and
|
|
|
- * to respond to {@link #getLeaderHostName()} requests.
|
|
|
- */
|
|
|
- public synchronized String getHostName() {
|
|
|
- return hostName;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized void setHostName(String hostName) {
|
|
|
- this.hostName = hostName;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * The type of event.
|
|
|
- */
|
|
|
- public static enum EventType {
|
|
|
- START, OFFER_START, OFFER_COMPLETE, DETERMINE_START, DETERMINE_COMPLETE, ELECTED_START, ELECTED_COMPLETE, READY_START, READY_COMPLETE, FAILED, STOP_START, STOP_COMPLETE,
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * The internal state of the election support service.
|
|
|
- */
|
|
|
- public static enum State {
|
|
|
- START, OFFER, DETERMINE, ELECTED, READY, FAILED, STOP
|
|
|
- }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The internal state of the election support service.
|
|
|
+ */
|
|
|
+ public enum State {
|
|
|
+ START,
|
|
|
+ OFFER,
|
|
|
+ DETERMINE,
|
|
|
+ ELECTED,
|
|
|
+ READY,
|
|
|
+ FAILED,
|
|
|
+ STOP
|
|
|
+ }
|
|
|
+
|
|
|
}
|