|
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.lang.StringBuffer;
|
|
import java.net.BindException;
|
|
import java.net.BindException;
|
|
import java.net.ServerSocket;
|
|
import java.net.ServerSocket;
|
|
import java.net.Socket;
|
|
import java.net.Socket;
|
|
@@ -31,6 +32,7 @@ import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
@@ -55,7 +57,7 @@ public class Leader {
|
|
static public class Proposal {
|
|
static public class Proposal {
|
|
public QuorumPacket packet;
|
|
public QuorumPacket packet;
|
|
|
|
|
|
- public int ackCount;
|
|
|
|
|
|
+ public HashSet<Long> ackSet = new HashSet<Long>();
|
|
|
|
|
|
public Request request;
|
|
public Request request;
|
|
|
|
|
|
@@ -80,7 +82,9 @@ public class Leader {
|
|
|
|
|
|
//Pending sync requests
|
|
//Pending sync requests
|
|
public HashMap<Long,List<FollowerSyncRequest>> pendingSyncs = new HashMap<Long,List<FollowerSyncRequest>>();
|
|
public HashMap<Long,List<FollowerSyncRequest>> pendingSyncs = new HashMap<Long,List<FollowerSyncRequest>>();
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ //Follower counter
|
|
|
|
+ AtomicLong followerCounter = new AtomicLong(-1);
|
|
/**
|
|
/**
|
|
* Adds follower to the leader.
|
|
* Adds follower to the leader.
|
|
*
|
|
*
|
|
@@ -149,10 +153,10 @@ public class Leader {
|
|
final static int NEWLEADER = 10;
|
|
final static int NEWLEADER = 10;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * This message type is sent by a follower to indicate the last zxid in its
|
|
|
|
- * log.
|
|
|
|
|
|
+ * This message type is sent by a follower to pass the last zxid. This is here
|
|
|
|
+ * for backward compatibility purposes.
|
|
*/
|
|
*/
|
|
- final static int LASTZXID = 11;
|
|
|
|
|
|
+ final static int FOLLOWERINFO = 11;
|
|
|
|
|
|
/**
|
|
/**
|
|
* This message type is sent by the leader to indicate that the follower is
|
|
* This message type is sent by the leader to indicate that the follower is
|
|
@@ -198,7 +202,7 @@ public class Leader {
|
|
* between the leader and the follower.
|
|
* between the leader and the follower.
|
|
*/
|
|
*/
|
|
final static int SYNC = 7;
|
|
final static int SYNC = 7;
|
|
-
|
|
|
|
|
|
+
|
|
private ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
|
|
private ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
|
|
|
|
|
|
ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
|
|
ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
|
|
@@ -275,15 +279,23 @@ public class Leader {
|
|
// We have to get at least a majority of servers in sync with
|
|
// We have to get at least a majority of servers in sync with
|
|
// us. We do this by waiting for the NEWLEADER packet to get
|
|
// us. We do this by waiting for the NEWLEADER packet to get
|
|
// acknowledged
|
|
// acknowledged
|
|
- newLeaderProposal.ackCount++;
|
|
|
|
- while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
|
|
|
|
|
|
+ newLeaderProposal.ackSet.add(self.getId());
|
|
|
|
+ while (!self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
|
|
|
|
+ //while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
|
|
if (self.tick > self.initLimit) {
|
|
if (self.tick > self.initLimit) {
|
|
// Followers aren't syncing fast enough,
|
|
// Followers aren't syncing fast enough,
|
|
// renounce leadership!
|
|
// renounce leadership!
|
|
- shutdown("Waiting for " + (self.quorumPeers.size() / 2)
|
|
|
|
- + " followers, only synced with "
|
|
|
|
- + newLeaderProposal.ackCount);
|
|
|
|
- if (followers.size() >= self.quorumPeers.size() / 2) {
|
|
|
|
|
|
+ StringBuffer ackToString = new StringBuffer();
|
|
|
|
+ for(Long id : newLeaderProposal.ackSet)
|
|
|
|
+ ackToString.append(id + ": ");
|
|
|
|
+
|
|
|
|
+ shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
|
|
|
|
+ HashSet<Long> followerSet = new HashSet<Long>();
|
|
|
|
+ for(FollowerHandler f : followers)
|
|
|
|
+ followerSet.add(f.getSid());
|
|
|
|
+
|
|
|
|
+ if (self.getQuorumVerifier().containsQuorum(followerSet)) {
|
|
|
|
+ //if (followers.size() >= self.quorumPeers.size() / 2) {
|
|
LOG.warn("Enough followers present. "+
|
|
LOG.warn("Enough followers present. "+
|
|
"Perhaps the initTicks need to be increased.");
|
|
"Perhaps the initTicks need to be increased.");
|
|
}
|
|
}
|
|
@@ -312,24 +324,29 @@ public class Leader {
|
|
self.tick++;
|
|
self.tick++;
|
|
}
|
|
}
|
|
int syncedCount = 0;
|
|
int syncedCount = 0;
|
|
|
|
+ HashSet<Long> syncedSet = new HashSet<Long>();
|
|
|
|
+
|
|
// lock on the followers when we use it.
|
|
// lock on the followers when we use it.
|
|
|
|
+ syncedSet.add(self.getId());
|
|
synchronized (followers) {
|
|
synchronized (followers) {
|
|
for (FollowerHandler f : followers) {
|
|
for (FollowerHandler f : followers) {
|
|
if (f.synced()) {
|
|
if (f.synced()) {
|
|
syncedCount++;
|
|
syncedCount++;
|
|
|
|
+ syncedSet.add(f.getSid());
|
|
}
|
|
}
|
|
f.ping();
|
|
f.ping();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
|
|
|
|
|
|
+ if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
|
|
|
|
+ //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
|
|
// Lost quorum, shutdown
|
|
// Lost quorum, shutdown
|
|
shutdown("Only " + syncedCount + " followers, need "
|
|
shutdown("Only " + syncedCount + " followers, need "
|
|
+ (self.quorumPeers.size() / 2));
|
|
+ (self.quorumPeers.size() / 2));
|
|
// make sure the order is the same!
|
|
// make sure the order is the same!
|
|
// the leader goes to looking
|
|
// the leader goes to looking
|
|
return;
|
|
return;
|
|
- }
|
|
|
|
- tickSkip = !tickSkip;
|
|
|
|
|
|
+ }
|
|
|
|
+ tickSkip = !tickSkip;
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
zk.unregisterJMX(this);
|
|
zk.unregisterJMX(this);
|
|
@@ -385,7 +402,7 @@ public class Leader {
|
|
* the zxid of the proposal sent out
|
|
* the zxid of the proposal sent out
|
|
* @param followerAddr
|
|
* @param followerAddr
|
|
*/
|
|
*/
|
|
- synchronized public void processAck(long zxid, SocketAddress followerAddr) {
|
|
|
|
|
|
+ synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
|
|
boolean first = true;
|
|
boolean first = true;
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -419,19 +436,18 @@ public class Leader {
|
|
+ Long.toHexString(zxid) + " from " + followerAddr);
|
|
+ Long.toHexString(zxid) + " from " + followerAddr);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- p.ackCount++;
|
|
|
|
|
|
+
|
|
|
|
+ p.ackSet.add(sid);
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
|
|
LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
|
|
- + " is " + p.ackCount);
|
|
|
|
|
|
+ + " is " + p.ackSet.size());
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (p.ackCount > self.quorumPeers.size() / 2){
|
|
|
|
|
|
+ if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
|
|
if (zxid != lastCommitted+1) {
|
|
if (zxid != lastCommitted+1) {
|
|
LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
|
|
LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
|
|
+ " from " + followerAddr + " not first!");
|
|
+ " from " + followerAddr + " not first!");
|
|
LOG.warn("First is "
|
|
LOG.warn("First is "
|
|
+ (lastCommitted+1));
|
|
+ (lastCommitted+1));
|
|
- //System.exit(13);
|
|
|
|
}
|
|
}
|
|
outstandingProposals.remove(zxid);
|
|
outstandingProposals.remove(zxid);
|
|
if (p.request != null) {
|
|
if (p.request != null) {
|