|
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.SocketException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -93,22 +95,54 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
this.electionAddr = null;
|
|
|
}
|
|
|
|
|
|
+ public QuorumServer(long id, InetSocketAddress addr,
|
|
|
+ InetSocketAddress electionAddr, LearnerType type) {
|
|
|
+ this.id = id;
|
|
|
+ this.addr = addr;
|
|
|
+ this.electionAddr = electionAddr;
|
|
|
+ this.type = type;
|
|
|
+ }
|
|
|
+
|
|
|
public InetSocketAddress addr;
|
|
|
|
|
|
public InetSocketAddress electionAddr;
|
|
|
|
|
|
public long id;
|
|
|
+
|
|
|
+ public LearnerType type = LearnerType.PARTICIPANT;
|
|
|
}
|
|
|
|
|
|
public enum ServerState {
|
|
|
- LOOKING, FOLLOWING, LEADING;
|
|
|
+ LOOKING, FOLLOWING, LEADING, OBSERVING;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A peer can either be participating, which implies that it is willing to
|
|
|
+ * both vote in instances of consensus and to elect or become a Leader, or
|
|
|
+ * it may be observing in which case it isn't.
|
|
|
+ *
|
|
|
+ * We need this distinction to decide which ServerState to move to when
|
|
|
+ * conditions change (e.g. which state to become after LOOKING).
|
|
|
+ */
|
|
|
+ public enum LearnerType {
|
|
|
+ PARTICIPANT, OBSERVER;
|
|
|
+ }
|
|
|
+
|
|
|
+ private LearnerType peerType = LearnerType.PARTICIPANT;
|
|
|
+
|
|
|
+ public LearnerType getPeerType() {
|
|
|
+ return peerType;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setPeerType(LearnerType p) {
|
|
|
+ peerType = p;
|
|
|
}
|
|
|
/**
|
|
|
* The servers that make up the cluster
|
|
|
*/
|
|
|
- Map<Long, QuorumServer> quorumPeers;
|
|
|
+ protected Map<Long, QuorumServer> quorumPeers;
|
|
|
public int getQuorumSize(){
|
|
|
- return quorumPeers.size();
|
|
|
+ return getVotingView().size();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -226,6 +260,11 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
// This can happen in state transitions,
|
|
|
// just ignore the request
|
|
|
}
|
|
|
+ break;
|
|
|
+ case OBSERVING:
|
|
|
+ // Do nothing, Observers keep themselves to
|
|
|
+ // themselves.
|
|
|
+ break;
|
|
|
}
|
|
|
packet.setData(b);
|
|
|
udpSocket.send(packet);
|
|
@@ -233,7 +272,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
packet.setLength(b.length);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Unexpected exception",e);
|
|
|
+ LOG.warn("Unexpected exception in ResponderThread",e);
|
|
|
} finally {
|
|
|
LOG.warn("QuorumPeer responder thread exited");
|
|
|
}
|
|
@@ -282,7 +321,8 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
long myid, int tickTime, int initLimit, int syncLimit,
|
|
|
NIOServerCnxn.Factory cnxnFactory) throws IOException {
|
|
|
this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
|
|
|
- initLimit, syncLimit, cnxnFactory, new QuorumMaj(quorumPeers.size()));
|
|
|
+ initLimit, syncLimit, cnxnFactory,
|
|
|
+ new QuorumMaj(countParticipants(quorumPeers)));
|
|
|
}
|
|
|
|
|
|
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
|
|
@@ -300,7 +340,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
this.syncLimit = syncLimit;
|
|
|
this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
|
|
|
if(quorumConfig == null)
|
|
|
- this.quorumConfig = new QuorumMaj(quorumPeers.size());
|
|
|
+ this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
|
|
|
else this.quorumConfig = quorumConfig;
|
|
|
}
|
|
|
|
|
@@ -310,8 +350,10 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
|
|
|
@Override
|
|
|
public synchronized void start() {
|
|
|
- cnxnFactory.start();
|
|
|
- startLeaderElection();
|
|
|
+ cnxnFactory.start();
|
|
|
+ if (getPeerType() == LearnerType.PARTICIPANT) {
|
|
|
+ startLeaderElection();
|
|
|
+ }
|
|
|
super.start();
|
|
|
}
|
|
|
|
|
@@ -323,7 +365,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
}
|
|
|
synchronized public void startLeaderElection() {
|
|
|
currentVote = new Vote(myid, getLastLoggedZxid());
|
|
|
- for (QuorumServer p : quorumPeers.values()) {
|
|
|
+ for (QuorumServer p : getView().values()) {
|
|
|
if (p.id == myid) {
|
|
|
myQuorumAddr = p.addr;
|
|
|
break;
|
|
@@ -344,6 +386,20 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
this.electionAlg = createElectionAlgorithm(electionType);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Count the number of nodes in the map that could be followers.
|
|
|
+ * @param peers
|
|
|
+ * @return The number of followers in the map
|
|
|
+ */
|
|
|
+ protected static int countParticipants(Map<Long,QuorumServer> peers) {
|
|
|
+ int count = 0;
|
|
|
+ for (QuorumServer q : peers.values()) {
|
|
|
+ if (q.type == LearnerType.PARTICIPANT) {
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* This constructor is only used by the existing unit test code.
|
|
@@ -357,7 +413,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
this(quorumPeers, snapDir, logDir, electionAlg,
|
|
|
myid,tickTime, initLimit,syncLimit,
|
|
|
new NIOServerCnxn.Factory(clientPort),
|
|
|
- new QuorumMaj(quorumPeers.size()));
|
|
|
+ new QuorumMaj(countParticipants(quorumPeers)));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -380,6 +436,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
}
|
|
|
public Follower follower;
|
|
|
public Leader leader;
|
|
|
+ public Observer observer;
|
|
|
|
|
|
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
|
|
|
return new Follower(this, new FollowerZooKeeperServer(logFactory,
|
|
@@ -390,9 +447,15 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
return new Leader(this, new LeaderZooKeeperServer(logFactory,
|
|
|
this,new ZooKeeperServer.BasicDataTreeBuilder()));
|
|
|
}
|
|
|
+
|
|
|
+ protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
|
|
|
+ return new Observer(this, new ObserverZooKeeperServer(logFactory,
|
|
|
+ this, new ZooKeeperServer.BasicDataTreeBuilder()));
|
|
|
+ }
|
|
|
|
|
|
private Election createElectionAlgorithm(int electionAlgorithm){
|
|
|
Election le=null;
|
|
|
+
|
|
|
//TODO: use a factory rather than a switch
|
|
|
switch (electionAlgorithm) {
|
|
|
case 0:
|
|
@@ -423,6 +486,8 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
protected Election makeLEStrategy(){
|
|
|
LOG.debug("Initializing leader election protocol...");
|
|
|
|
|
|
+ // LeaderElection is the only implementation that correctly
|
|
|
+ // transitions between LOOKING and OBSERVER
|
|
|
if(electionAlg==null)
|
|
|
return new LeaderElection(this);
|
|
|
return electionAlg;
|
|
@@ -435,12 +500,18 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
synchronized protected void setFollower(Follower newFollower){
|
|
|
follower=newFollower;
|
|
|
}
|
|
|
+
|
|
|
+ synchronized protected void setObserver(Observer newObserver){
|
|
|
+ observer=newObserver;
|
|
|
+ }
|
|
|
|
|
|
synchronized public ZooKeeperServer getActiveServer(){
|
|
|
if(leader!=null)
|
|
|
return leader.zk;
|
|
|
else if(follower!=null)
|
|
|
return follower.zk;
|
|
|
+ else if (observer != null)
|
|
|
+ return observer.zk;
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -491,6 +562,19 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
setPeerState(ServerState.LOOKING);
|
|
|
}
|
|
|
break;
|
|
|
+ case OBSERVING:
|
|
|
+ try {
|
|
|
+ LOG.info("OBSERVING");
|
|
|
+ setObserver(makeObserver(logFactory));
|
|
|
+ observer.observeLeader();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Unexpected exception",e );
|
|
|
+ } finally {
|
|
|
+ observer.shutdown();
|
|
|
+ setObserver(null);
|
|
|
+ setPeerState(ServerState.LOOKING);
|
|
|
+ }
|
|
|
+ break;
|
|
|
case FOLLOWING:
|
|
|
try {
|
|
|
LOG.info("FOLLOWING");
|
|
@@ -549,11 +633,42 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * A 'view' is a node's current opinion of the membership of the
|
|
|
- * ensemble.
|
|
|
+ * A 'view' is a node's current opinion of the membership of the entire
|
|
|
+ * ensemble.
|
|
|
*/
|
|
|
public Map<Long,QuorumPeer.QuorumServer> getView() {
|
|
|
- return this.quorumPeers;
|
|
|
+ return Collections.unmodifiableMap(this.quorumPeers);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Observers are not contained in this view, only nodes with
|
|
|
+ * PeerType=PARTICIPANT.
|
|
|
+ */
|
|
|
+ public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
|
|
|
+ Map<Long,QuorumPeer.QuorumServer> ret =
|
|
|
+ new HashMap<Long, QuorumPeer.QuorumServer>();
|
|
|
+ Map<Long,QuorumPeer.QuorumServer> view = getView();
|
|
|
+ for (QuorumServer server : view.values()) {
|
|
|
+ if (server.type == LearnerType.PARTICIPANT) {
|
|
|
+ ret.put(server.id, server);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns only observers, no followers.
|
|
|
+ */
|
|
|
+ public Map<Long,QuorumPeer.QuorumServer> getObservingView() {
|
|
|
+ Map<Long,QuorumPeer.QuorumServer> ret =
|
|
|
+ new HashMap<Long, QuorumPeer.QuorumServer>();
|
|
|
+ Map<Long,QuorumPeer.QuorumServer> view = getView();
|
|
|
+ for (QuorumServer server : view.values()) {
|
|
|
+ if (server.type == LearnerType.OBSERVER) {
|
|
|
+ ret.put(server.id, server);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -565,6 +680,9 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
return this.quorumPeers.containsKey(sid);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Only used by QuorumStats at the moment
|
|
|
+ */
|
|
|
public String[] getQuorumPeers() {
|
|
|
List<String> l = new ArrayList<String>();
|
|
|
synchronized (this) {
|
|
@@ -594,6 +712,8 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
return QuorumStats.Provider.LEADING_STATE;
|
|
|
case FOLLOWING:
|
|
|
return QuorumStats.Provider.FOLLOWING_STATE;
|
|
|
+ case OBSERVING:
|
|
|
+ return QuorumStats.Provider.OBSERVING_STATE;
|
|
|
}
|
|
|
return QuorumStats.Provider.UNKNOWN_STATE;
|
|
|
}
|