|
@@ -32,7 +32,6 @@ import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
-import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
@@ -76,8 +75,6 @@ public class Leader {
|
|
|
|
|
|
final QuorumPeer self;
|
|
|
|
|
|
- private boolean quorumFormed = false;
|
|
|
-
|
|
|
// the follower acceptor thread
|
|
|
volatile LearnerCnxAcceptor cnxAcceptor = null;
|
|
|
|
|
@@ -379,6 +376,8 @@ public class Leader {
|
|
|
LOG.info("NEWLEADER proposal has Zxid of "
|
|
|
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
|
|
|
}
|
|
|
+ outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
|
|
|
+ newLeaderProposal.ackSet.add(self.getId());
|
|
|
|
|
|
waitForEpochAck(self.getId(), leaderStateSummary);
|
|
|
self.setCurrentEpoch(epoch);
|
|
@@ -386,24 +385,35 @@ public class Leader {
|
|
|
// We have to get at least a majority of servers in sync with
|
|
|
// us. We do this by waiting for the NEWLEADER packet to get
|
|
|
// acknowledged
|
|
|
- try {
|
|
|
- waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- shutdown("Waiting for a quorum of followers, only synced with sids: [ "
|
|
|
- + getSidSetString(newLeaderProposal.ackSet) + " ]");
|
|
|
- HashSet<Long> followerSet = new HashSet<Long>();
|
|
|
- for (LearnerHandler f : learners)
|
|
|
- followerSet.add(f.getSid());
|
|
|
-
|
|
|
- if (self.getQuorumVerifier().containsQuorum(followerSet)) {
|
|
|
- LOG.warn("Enough followers present. "
|
|
|
- + "Perhaps the initTicks need to be increased.");
|
|
|
+ while (!self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
|
|
|
+ //while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
|
|
|
+ if (self.tick > self.initLimit) {
|
|
|
+ // Followers aren't syncing fast enough,
|
|
|
+ // renounce leadership!
|
|
|
+ StringBuilder ackToString = new StringBuilder();
|
|
|
+ for(Long id : newLeaderProposal.ackSet)
|
|
|
+ ackToString.append(id + ": ");
|
|
|
+
|
|
|
+ shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
|
|
|
+ HashSet<Long> followerSet = new HashSet<Long>();
|
|
|
+
|
|
|
+ for(LearnerHandler f : getLearners()) {
|
|
|
+ if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
|
|
|
+ followerSet.add(f.getSid());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (self.getQuorumVerifier().containsQuorum(followerSet)) {
|
|
|
+ //if (followers.size() >= self.quorumPeers.size() / 2) {
|
|
|
+ LOG.warn("Enough followers present. "+
|
|
|
+ "Perhaps the initTicks need to be increased.");
|
|
|
+ }
|
|
|
+ return;
|
|
|
}
|
|
|
- return;
|
|
|
+ Thread.sleep(self.tickTime);
|
|
|
+ self.tick++;
|
|
|
}
|
|
|
|
|
|
- startZkServer();
|
|
|
-
|
|
|
/**
|
|
|
* WARNING: do not use this for anything other than QA testing
|
|
|
* on a real cluster. Specifically to enable verification that quorum
|
|
@@ -458,8 +468,9 @@ public class Leader {
|
|
|
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
|
|
|
//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
|
|
|
// Lost quorum, shutdown
|
|
|
- shutdown("Not sufficient followers synced, only synced with sids: [ "
|
|
|
- + getSidSetString(syncedSet) + " ]");
|
|
|
+ // TODO: message is wrong unless majority quorums used
|
|
|
+ shutdown("Only " + syncedSet.size() + " followers, need "
|
|
|
+ + (self.getVotingView().size() / 2));
|
|
|
// make sure the order is the same!
|
|
|
// the leader goes to looking
|
|
|
return;
|
|
@@ -533,15 +544,6 @@ public class Leader {
|
|
|
LOG.trace("outstanding proposals all");
|
|
|
}
|
|
|
|
|
|
- if ((zxid & 0xffffffffL) == 0) {
|
|
|
- /*
|
|
|
- * We no longer process NEWLEADER ack by this method. However,
|
|
|
- * the learner sends ack back to the leader after it gets UPTODATE
|
|
|
- * so we just ignore the message.
|
|
|
- */
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
if (outstandingProposals.size() == 0) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("outstanding is 0");
|
|
@@ -578,17 +580,26 @@ public class Leader {
|
|
|
if (p.request != null) {
|
|
|
toBeApplied.add(p);
|
|
|
}
|
|
|
-
|
|
|
- if (p.request == null) {
|
|
|
- LOG.warn("Going to commmit null request for proposal: {}", p);
|
|
|
- }
|
|
|
- commit(zxid);
|
|
|
- inform(p);
|
|
|
- zk.commitProcessor.commit(p.request);
|
|
|
- if(pendingSyncs.containsKey(zxid)){
|
|
|
- for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
|
|
|
- sendSync(r);
|
|
|
+ // We don't commit the new leader proposal
|
|
|
+ if ((zxid & 0xffffffffL) != 0) {
|
|
|
+ if (p.request == null) {
|
|
|
+ LOG.warn("Going to commmit null request for proposal: {}", p);
|
|
|
+ }
|
|
|
+ commit(zxid);
|
|
|
+ inform(p);
|
|
|
+ zk.commitProcessor.commit(p.request);
|
|
|
+ if(pendingSyncs.containsKey(zxid)){
|
|
|
+ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
|
|
|
+ sendSync(r);
|
|
|
+ }
|
|
|
}
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ lastCommitted = zxid;
|
|
|
+ LOG.info("Have quorum of supporters; starting up and setting last processed zxid: 0x{}",
|
|
|
+ Long.toHexString(zk.getZxid()));
|
|
|
+ zk.startup();
|
|
|
+ zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -912,86 +923,6 @@ public class Leader {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Return a list of sid in set as string
|
|
|
- */
|
|
|
- private String getSidSetString(Set<Long> sidSet) {
|
|
|
- StringBuilder sids = new StringBuilder();
|
|
|
- Iterator<Long> iter = sidSet.iterator();
|
|
|
- while (iter.hasNext()) {
|
|
|
- sids.append(iter.next());
|
|
|
- if (!iter.hasNext()) {
|
|
|
- break;
|
|
|
- }
|
|
|
- sids.append(",");
|
|
|
- }
|
|
|
- return sids.toString();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Start up Leader ZooKeeper server and initialize zxid to the new epoch
|
|
|
- */
|
|
|
- private synchronized void startZkServer() {
|
|
|
- // Update lastCommitted and Db's zxid to a value representing the new epoch
|
|
|
- lastCommitted = zk.getZxid();
|
|
|
- LOG.info("Have quorum of supporters, sids: [ "
|
|
|
- + getSidSetString(newLeaderProposal.ackSet)
|
|
|
- + " ]; starting up and setting last processed zxid: 0x{}",
|
|
|
- Long.toHexString(zk.getZxid()));
|
|
|
- zk.startup();
|
|
|
- zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Process NEWLEADER ack of a given sid and wait until the leader receives
|
|
|
- * sufficient acks.
|
|
|
- *
|
|
|
- * @param sid
|
|
|
- * @param learnerType
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
|
|
|
- throws InterruptedException {
|
|
|
-
|
|
|
- synchronized (newLeaderProposal.ackSet) {
|
|
|
-
|
|
|
- if (quorumFormed) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- long currentZxid = newLeaderProposal.packet.getZxid();
|
|
|
- if (zxid != currentZxid) {
|
|
|
- LOG.error("NEWLEADER ACK from sid: " + sid
|
|
|
- + " is from a different epoch - current 0x"
|
|
|
- + Long.toHexString(currentZxid) + " receieved 0x"
|
|
|
- + Long.toHexString(zxid));
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (learnerType == LearnerType.PARTICIPANT) {
|
|
|
- newLeaderProposal.ackSet.add(sid);
|
|
|
- }
|
|
|
-
|
|
|
- if (self.getQuorumVerifier().containsQuorum(
|
|
|
- newLeaderProposal.ackSet)) {
|
|
|
- quorumFormed = true;
|
|
|
- newLeaderProposal.ackSet.notifyAll();
|
|
|
- } else {
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- long cur = start;
|
|
|
- long end = start + self.getInitLimit() * self.getTickTime();
|
|
|
- while (!quorumFormed && cur < end) {
|
|
|
- newLeaderProposal.ackSet.wait(end - cur);
|
|
|
- cur = System.currentTimeMillis();
|
|
|
- }
|
|
|
- if (!quorumFormed) {
|
|
|
- throw new InterruptedException(
|
|
|
- "Timeout while waiting for NEWLEADER to be acked by quorum");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Get string representation of a given packet type
|