|
@@ -32,6 +32,7 @@ import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
@@ -75,6 +76,8 @@ public class Leader {
|
|
|
|
|
|
final QuorumPeer self;
|
|
|
|
|
|
+ private boolean quorumFormed = false;
|
|
|
+
|
|
|
// the follower acceptor thread
|
|
|
volatile LearnerCnxAcceptor cnxAcceptor = null;
|
|
|
|
|
@@ -376,8 +379,6 @@ public class Leader {
|
|
|
LOG.info("NEWLEADER proposal has Zxid of "
|
|
|
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
|
|
|
}
|
|
|
- outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
|
|
|
- newLeaderProposal.ackSet.add(self.getId());
|
|
|
|
|
|
waitForEpochAck(self.getId(), leaderStateSummary);
|
|
|
self.setCurrentEpoch(epoch);
|
|
@@ -385,35 +386,24 @@ public class Leader {
|
|
|
// We have to get at least a majority of servers in sync with
|
|
|
// us. We do this by waiting for the NEWLEADER packet to get
|
|
|
// acknowledged
|
|
|
- while (!self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
|
|
|
- //while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
|
|
|
- if (self.tick > self.initLimit) {
|
|
|
- // Followers aren't syncing fast enough,
|
|
|
- // renounce leadership!
|
|
|
- StringBuilder ackToString = new StringBuilder();
|
|
|
- for(Long id : newLeaderProposal.ackSet)
|
|
|
- ackToString.append(id + ": ");
|
|
|
-
|
|
|
- shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
|
|
|
- HashSet<Long> followerSet = new HashSet<Long>();
|
|
|
-
|
|
|
- for(LearnerHandler f : getLearners()) {
|
|
|
- if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
|
|
|
- followerSet.add(f.getSid());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (self.getQuorumVerifier().containsQuorum(followerSet)) {
|
|
|
- //if (followers.size() >= self.quorumPeers.size() / 2) {
|
|
|
- LOG.warn("Enough followers present. "+
|
|
|
- "Perhaps the initTicks need to be increased.");
|
|
|
- }
|
|
|
- return;
|
|
|
+ try {
|
|
|
+ waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ shutdown("Waiting for a quorum of followers, only synced with sids: [ "
|
|
|
+ + getSidSetString(newLeaderProposal.ackSet) + " ]");
|
|
|
+ HashSet<Long> followerSet = new HashSet<Long>();
|
|
|
+ for (LearnerHandler f : learners)
|
|
|
+ followerSet.add(f.getSid());
|
|
|
+
|
|
|
+ if (self.getQuorumVerifier().containsQuorum(followerSet)) {
|
|
|
+ LOG.warn("Enough followers present. "
|
|
|
+ + "Perhaps the initTicks need to be increased.");
|
|
|
}
|
|
|
- Thread.sleep(self.tickTime);
|
|
|
- self.tick++;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
+ startZkServer();
|
|
|
+
|
|
|
/**
|
|
|
* WARNING: do not use this for anything other than QA testing
|
|
|
* on a real cluster. Specifically to enable verification that quorum
|
|
@@ -468,9 +458,8 @@ public class Leader {
|
|
|
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
|
|
|
//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
|
|
|
// Lost quorum, shutdown
|
|
|
- // TODO: message is wrong unless majority quorums used
|
|
|
- shutdown("Only " + syncedSet.size() + " followers, need "
|
|
|
- + (self.getVotingView().size() / 2));
|
|
|
+ shutdown("Not sufficient followers synced, only synced with sids: [ "
|
|
|
+ + getSidSetString(syncedSet) + " ]");
|
|
|
// make sure the order is the same!
|
|
|
// the leader goes to looking
|
|
|
return;
|
|
@@ -544,6 +533,15 @@ public class Leader {
|
|
|
LOG.trace("outstanding proposals all");
|
|
|
}
|
|
|
|
|
|
+ if ((zxid & 0xffffffffL) == 0) {
|
|
|
+ /*
|
|
|
+ * We no longer process NEWLEADER ack by this method. However,
|
|
|
+ * the learner sends ack back to the leader after it gets UPTODATE
|
|
|
+ * so we just ignore the message.
|
|
|
+ */
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
if (outstandingProposals.size() == 0) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("outstanding is 0");
|
|
@@ -580,26 +578,17 @@ public class Leader {
|
|
|
if (p.request != null) {
|
|
|
toBeApplied.add(p);
|
|
|
}
|
|
|
- // We don't commit the new leader proposal
|
|
|
- if ((zxid & 0xffffffffL) != 0) {
|
|
|
- if (p.request == null) {
|
|
|
- LOG.warn("Going to commmit null request for proposal: {}", p);
|
|
|
- }
|
|
|
- commit(zxid);
|
|
|
- inform(p);
|
|
|
- zk.commitProcessor.commit(p.request);
|
|
|
- if(pendingSyncs.containsKey(zxid)){
|
|
|
- for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
|
|
|
- sendSync(r);
|
|
|
- }
|
|
|
+
|
|
|
+ if (p.request == null) {
|
|
|
+ LOG.warn("Going to commmit null request for proposal: {}", p);
|
|
|
+ }
|
|
|
+ commit(zxid);
|
|
|
+ inform(p);
|
|
|
+ zk.commitProcessor.commit(p.request);
|
|
|
+ if(pendingSyncs.containsKey(zxid)){
|
|
|
+ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
|
|
|
+ sendSync(r);
|
|
|
}
|
|
|
- return;
|
|
|
- } else {
|
|
|
- lastCommitted = zxid;
|
|
|
- LOG.info("Have quorum of supporters; starting up and setting last processed zxid: 0x{}",
|
|
|
- Long.toHexString(zk.getZxid()));
|
|
|
- zk.startup();
|
|
|
- zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -923,6 +912,86 @@ public class Leader {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return a list of sid in set as string
|
|
|
+ */
|
|
|
+ private String getSidSetString(Set<Long> sidSet) {
|
|
|
+ StringBuilder sids = new StringBuilder();
|
|
|
+ Iterator<Long> iter = sidSet.iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ sids.append(iter.next());
|
|
|
+ if (!iter.hasNext()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ sids.append(",");
|
|
|
+ }
|
|
|
+ return sids.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start up Leader ZooKeeper server and initialize zxid to the new epoch
|
|
|
+ */
|
|
|
+ private synchronized void startZkServer() {
|
|
|
+ // Update lastCommitted and Db's zxid to a value representing the new epoch
|
|
|
+ lastCommitted = zk.getZxid();
|
|
|
+ LOG.info("Have quorum of supporters, sids: [ "
|
|
|
+ + getSidSetString(newLeaderProposal.ackSet)
|
|
|
+ + " ]; starting up and setting last processed zxid: 0x{}",
|
|
|
+ Long.toHexString(zk.getZxid()));
|
|
|
+ zk.startup();
|
|
|
+ zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Process NEWLEADER ack of a given sid and wait until the leader receives
|
|
|
+ * sufficient acks.
|
|
|
+ *
|
|
|
+ * @param sid
|
|
|
+ * @param learnerType
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
|
|
|
+ throws InterruptedException {
|
|
|
+
|
|
|
+ synchronized (newLeaderProposal.ackSet) {
|
|
|
+
|
|
|
+ if (quorumFormed) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long currentZxid = newLeaderProposal.packet.getZxid();
|
|
|
+ if (zxid != currentZxid) {
|
|
|
+ LOG.error("NEWLEADER ACK from sid: " + sid
|
|
|
+ + " is from a different epoch - current 0x"
|
|
|
+ + Long.toHexString(currentZxid) + " receieved 0x"
|
|
|
+ + Long.toHexString(zxid));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (learnerType == LearnerType.PARTICIPANT) {
|
|
|
+ newLeaderProposal.ackSet.add(sid);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (self.getQuorumVerifier().containsQuorum(
|
|
|
+ newLeaderProposal.ackSet)) {
|
|
|
+ quorumFormed = true;
|
|
|
+ newLeaderProposal.ackSet.notifyAll();
|
|
|
+ } else {
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ long cur = start;
|
|
|
+ long end = start + self.getInitLimit() * self.getTickTime();
|
|
|
+ while (!quorumFormed && cur < end) {
|
|
|
+ newLeaderProposal.ackSet.wait(end - cur);
|
|
|
+ cur = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ if (!quorumFormed) {
|
|
|
+ throw new InterruptedException(
|
|
|
+ "Timeout while waiting for NEWLEADER to be acked by quorum");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get string representation of a given packet type
|