|
@@ -33,6 +33,7 @@ import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Set;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
@@ -96,6 +97,16 @@ public class Leader {
|
|
}
|
|
}
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public String ackSetsToString(){
|
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
|
+
|
|
|
|
+ for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
|
|
|
|
+ sb.append(qvAckset.getAckset().toString()).append(",");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return sb.substring(0, sb.length()-1);
|
|
|
|
+ }
|
|
|
|
|
|
public static class QuorumVerifierAcksetPair {
|
|
public static class QuorumVerifierAcksetPair {
|
|
private final QuorumVerifier _qv;
|
|
private final QuorumVerifier _qv;
|
|
@@ -120,7 +131,8 @@ public class Leader {
|
|
|
|
|
|
final QuorumPeer self;
|
|
final QuorumPeer self;
|
|
|
|
|
|
-
|
|
|
|
|
|
+ private boolean quorumFormed = false;
|
|
|
|
+
|
|
// the follower acceptor thread
|
|
// the follower acceptor thread
|
|
volatile LearnerCnxAcceptor cnxAcceptor = null;
|
|
volatile LearnerCnxAcceptor cnxAcceptor = null;
|
|
|
|
|
|
@@ -461,10 +473,7 @@ public class Leader {
|
|
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
|
|
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
|
|
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
|
|
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
|
|
}
|
|
}
|
|
- newLeaderProposal.addAck(self.getId());
|
|
|
|
|
|
|
|
- outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
|
|
|
|
- LOG.debug("put newleader into outstanding proposals");
|
|
|
|
// We have to get at least a majority of servers in sync with
|
|
// We have to get at least a majority of servers in sync with
|
|
// us. We do this by waiting for the NEWLEADER packet to get
|
|
// us. We do this by waiting for the NEWLEADER packet to get
|
|
// acknowledged
|
|
// acknowledged
|
|
@@ -472,47 +481,34 @@ public class Leader {
|
|
waitForEpochAck(self.getId(), leaderStateSummary);
|
|
waitForEpochAck(self.getId(), leaderStateSummary);
|
|
self.setCurrentEpoch(epoch);
|
|
self.setCurrentEpoch(epoch);
|
|
|
|
|
|
- while (!newLeaderProposal.hasAllQuorums()){
|
|
|
|
- //while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
|
|
|
|
- if (self.tick > self.initLimit) {
|
|
|
|
- // Followers aren't syncing fast enough,
|
|
|
|
- // renounce leadership!
|
|
|
|
- StringBuilder ackToString = new StringBuilder();
|
|
|
|
-
|
|
|
|
- for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
|
|
|
|
- if (ackToString.length() > 0) ackToString.append('\n');
|
|
|
|
- if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) {
|
|
|
|
- ackToString.append("Configuration " + qvAckset.getQuorumVerifier().getVersion() + ": waiting for a quorum of followers, only synced with: ");
|
|
|
|
- for(Long id : qvAckset.getAckset())
|
|
|
|
- ackToString.append(id + " ");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- shutdown(ackToString.toString());
|
|
|
|
- HashSet<Long> followerSet = new HashSet<Long>();
|
|
|
|
-
|
|
|
|
- for(LearnerHandler f : getLearners()) {
|
|
|
|
- if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
|
|
|
|
- followerSet.add(f.getSid());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- boolean initTicksShouldBeIncreased = true;
|
|
|
|
- for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
|
|
|
|
- if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
|
|
|
|
- initTicksShouldBeIncreased = false;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (initTicksShouldBeIncreased) {
|
|
|
|
- LOG.warn("Enough followers present. "+
|
|
|
|
- "Perhaps the initTicks need to be increased.");
|
|
|
|
- }
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- Thread.sleep(self.tickTime);
|
|
|
|
- self.tick++;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ try {
|
|
|
|
+ waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ shutdown("Waiting for a quorum of followers, only synced with sids: [ "
|
|
|
|
+ + newLeaderProposal.ackSetsToString() + " ]");
|
|
|
|
+ HashSet<Long> followerSet = new HashSet<Long>();
|
|
|
|
+
|
|
|
|
+ for(LearnerHandler f : getLearners()) {
|
|
|
|
+ if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
|
|
|
|
+ followerSet.add(f.getSid());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ boolean initTicksShouldBeIncreased = true;
|
|
|
|
+ for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
|
|
|
|
+ if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
|
|
|
|
+ initTicksShouldBeIncreased = false;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (initTicksShouldBeIncreased) {
|
|
|
|
+ LOG.warn("Enough followers present. "+
|
|
|
|
+ "Perhaps the initTicks need to be increased.");
|
|
|
|
+ }
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ startZkServer();
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* WARNING: do not use this for anything other than QA testing
|
|
* WARNING: do not use this for anything other than QA testing
|
|
* on a real cluster. Specifically to enable verification that quorum
|
|
* on a real cluster. Specifically to enable verification that quorum
|
|
@@ -565,14 +561,13 @@ public class Leader {
|
|
}
|
|
}
|
|
|
|
|
|
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
|
|
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
|
|
- //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
|
|
|
|
- // Lost quorum, shutdown
|
|
|
|
- // TODO: message is wrong unless majority quorums used
|
|
|
|
- shutdown("Only " + syncedSet.size() + " voting followers, need "
|
|
|
|
- + (self.getVotingView().size() / 2));
|
|
|
|
- // make sure the order is the same!
|
|
|
|
- // the leader goes to looking
|
|
|
|
- return;
|
|
|
|
|
|
+ //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
|
|
|
|
+ // Lost quorum, shutdown
|
|
|
|
+ shutdown("Not sufficient followers synced, only synced with sids: [ "
|
|
|
|
+ + getSidSetString(syncedSet) + " ]");
|
|
|
|
+ // make sure the order is the same!
|
|
|
|
+ // the leader goes to looking
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
tickSkip = !tickSkip;
|
|
tickSkip = !tickSkip;
|
|
}
|
|
}
|
|
@@ -708,63 +703,44 @@ public class Leader {
|
|
if (p.request != null) {
|
|
if (p.request != null) {
|
|
toBeApplied.add(p);
|
|
toBeApplied.add(p);
|
|
}
|
|
}
|
|
-
|
|
|
|
- // We don't commit the new leader proposal
|
|
|
|
- if ((zxid & 0xffffffffL) != 0) {
|
|
|
|
- if (p.request == null) {
|
|
|
|
- LOG.warn("Going to commmit null: " + p);
|
|
|
|
- } else if (p.request.getHdr().getType() == OpCode.reconfig) {
|
|
|
|
- LOG.debug("Committing a reconfiguration! " + outstandingProposals.size());
|
|
|
|
|
|
+
|
|
|
|
+ if (p.request == null) {
|
|
|
|
+ LOG.warn("Going to commmit null: " + p);
|
|
|
|
+ } else if (p.request.getHdr().getType() == OpCode.reconfig) {
|
|
|
|
+ LOG.debug("Committing a reconfiguration! " + outstandingProposals.size());
|
|
|
|
|
|
- //if this server is voter in new config with the same quorum address,
|
|
|
|
- //then it will remain the leader
|
|
|
|
- //otherwise an up-to-date follower will be designated as leader. This saves
|
|
|
|
- //leader election time, unless the designated leader fails
|
|
|
|
- Long designatedLeader = getDesignatedLeader(p, zxid);
|
|
|
|
- //LOG.warn("designated leader is: " + designatedLeader);
|
|
|
|
-
|
|
|
|
- QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
|
|
|
|
-
|
|
|
|
- self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
|
|
|
|
-
|
|
|
|
- if (designatedLeader != self.getId()) {
|
|
|
|
- allowedToCommit = false;
|
|
|
|
- }
|
|
|
|
|
|
+ //if this server is voter in new config with the same quorum address,
|
|
|
|
+ //then it will remain the leader
|
|
|
|
+ //otherwise an up-to-date follower will be designated as leader. This saves
|
|
|
|
+ //leader election time, unless the designated leader fails
|
|
|
|
+ Long designatedLeader = getDesignatedLeader(p, zxid);
|
|
|
|
+ //LOG.warn("designated leader is: " + designatedLeader);
|
|
|
|
+
|
|
|
|
+ QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
|
|
|
|
+
|
|
|
|
+ self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
|
|
|
|
+
|
|
|
|
+ if (designatedLeader != self.getId()) {
|
|
|
|
+ allowedToCommit = false;
|
|
|
|
+ }
|
|
|
|
|
|
- // we're sending the designated leader, and if the leader is changing the followers are
|
|
|
|
- // responsible for closing the connection - this way we are sure that at least a majority of them
|
|
|
|
- // receive the commit message.
|
|
|
|
- commitAndActivate(zxid, designatedLeader);
|
|
|
|
- informAndActivate(p, designatedLeader);
|
|
|
|
- //turnOffFollowers();
|
|
|
|
- } else {
|
|
|
|
- commit(zxid);
|
|
|
|
- inform(p);
|
|
|
|
- }
|
|
|
|
- zk.commitProcessor.commit(p.request);
|
|
|
|
- if(pendingSyncs.containsKey(zxid)){
|
|
|
|
- for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
|
|
|
|
- sendSync(r);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ // we're sending the designated leader, and if the leader is changing the followers are
|
|
|
|
+ // responsible for closing the connection - this way we are sure that at least a majority of them
|
|
|
|
+ // receive the commit message.
|
|
|
|
+ commitAndActivate(zxid, designatedLeader);
|
|
|
|
+ informAndActivate(p, designatedLeader);
|
|
|
|
+ //turnOffFollowers();
|
|
} else {
|
|
} else {
|
|
- lastCommitted = zxid;
|
|
|
|
- if(LOG.isInfoEnabled()){
|
|
|
|
- LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid());
|
|
|
|
- }
|
|
|
|
- QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
|
|
|
|
-
|
|
|
|
- Long designatedLeader = getDesignatedLeader(p, zxid);
|
|
|
|
-
|
|
|
|
- self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
|
|
|
|
- if (designatedLeader != self.getId()) {
|
|
|
|
- allowedToCommit = false;
|
|
|
|
- }
|
|
|
|
- LOG.debug("GOT QUORUM of ACKS FOR NEWLEADER msg " + allowedToCommit);
|
|
|
|
- zk.startup();
|
|
|
|
- zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
|
|
|
|
-
|
|
|
|
|
|
+ commit(zxid);
|
|
|
|
+ inform(p);
|
|
}
|
|
}
|
|
|
|
+ zk.commitProcessor.commit(p.request);
|
|
|
|
+ if(pendingSyncs.containsKey(zxid)){
|
|
|
|
+ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
|
|
|
|
+ sendSync(r);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -788,6 +764,17 @@ public class Leader {
|
|
}
|
|
}
|
|
LOG.trace("outstanding proposals all");
|
|
LOG.trace("outstanding proposals all");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ if ((zxid & 0xffffffffL) == 0) {
|
|
|
|
+ /*
|
|
|
|
+ * We no longer process NEWLEADER ack with this method. However,
|
|
|
|
+ * the learner sends an ack back to the leader after it gets
|
|
|
|
+ * UPTODATE, so we just ignore the message.
|
|
|
|
+ */
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
if (outstandingProposals.size() == 0) {
|
|
if (outstandingProposals.size() == 0) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("outstanding is 0");
|
|
LOG.debug("outstanding is 0");
|
|
@@ -1197,6 +1184,104 @@ public class Leader {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Return a list of sid in set as string
|
|
|
|
+ */
|
|
|
|
+ private String getSidSetString(Set<Long> sidSet) {
|
|
|
|
+ StringBuilder sids = new StringBuilder();
|
|
|
|
+ Iterator<Long> iter = sidSet.iterator();
|
|
|
|
+ while (iter.hasNext()) {
|
|
|
|
+ sids.append(iter.next());
|
|
|
|
+ if (!iter.hasNext()) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ sids.append(",");
|
|
|
|
+ }
|
|
|
|
+ return sids.toString();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Start up Leader ZooKeeper server and initialize zxid to the new epoch
|
|
|
|
+ */
|
|
|
|
+ private synchronized void startZkServer() {
|
|
|
|
+ // Update lastCommitted and Db's zxid to a value representing the new epoch
|
|
|
|
+ lastCommitted = zk.getZxid();
|
|
|
|
+ LOG.info("Have quorum of supporters, sids: [ "
|
|
|
|
+ + newLeaderProposal.ackSetsToString()
|
|
|
|
+ + " ]; starting up and setting last processed zxid: 0x{}",
|
|
|
|
+ Long.toHexString(zk.getZxid()));
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * ZOOKEEPER-1324. the leader sends the new config it must complete
|
|
|
|
+ * to others inside a NEWLEADER message (see LearnerHandler where
|
|
|
|
+ * the NEWLEADER message is constructed), and once it has enough
|
|
|
|
+ * acks we must execute the following code so that it applies the
|
|
|
|
+ * config to itself.
|
|
|
|
+ */
|
|
|
|
+ QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
|
|
|
|
+
|
|
|
|
+ Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());
|
|
|
|
+
|
|
|
|
+ self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
|
|
|
|
+ if (designatedLeader != self.getId()) {
|
|
|
|
+ allowedToCommit = false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ zk.startup();
|
|
|
|
+ zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Process NEWLEADER ack of a given sid and wait until the leader receives
|
|
|
|
+ * sufficient acks.
|
|
|
|
+ *
|
|
|
|
+ * @param sid
|
|
|
|
+ * @param learnerType
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
|
|
|
|
+ throws InterruptedException {
|
|
|
|
+
|
|
|
|
+ synchronized (newLeaderProposal.qvAcksetPairs) {
|
|
|
|
+
|
|
|
|
+ if (quorumFormed) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ long currentZxid = newLeaderProposal.packet.getZxid();
|
|
|
|
+ if (zxid != currentZxid) {
|
|
|
|
+ LOG.error("NEWLEADER ACK from sid: " + sid
|
|
|
|
+ + " is from a different epoch - current 0x"
|
|
|
|
+ + Long.toHexString(currentZxid) + " receieved 0x"
|
|
|
|
+ + Long.toHexString(zxid));
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Note that addAck already checks that the learner
|
|
|
|
+ * is a PARTICIPANT.
|
|
|
|
+ */
|
|
|
|
+ newLeaderProposal.addAck(sid);
|
|
|
|
+
|
|
|
|
+ if (newLeaderProposal.hasAllQuorums()) {
|
|
|
|
+ quorumFormed = true;
|
|
|
|
+ newLeaderProposal.qvAcksetPairs.notifyAll();
|
|
|
|
+ } else {
|
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
|
+ long cur = start;
|
|
|
|
+ long end = start + self.getInitLimit() * self.getTickTime();
|
|
|
|
+ while (!quorumFormed && cur < end) {
|
|
|
|
+ newLeaderProposal.qvAcksetPairs.wait(end - cur);
|
|
|
|
+ cur = System.currentTimeMillis();
|
|
|
|
+ }
|
|
|
|
+ if (!quorumFormed) {
|
|
|
|
+ throw new InterruptedException(
|
|
|
|
+ "Timeout while waiting for NEWLEADER to be acked by quorum");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Get string representation of a given packet type
|
|
* Get string representation of a given packet type
|