|
@@ -26,11 +26,14 @@ import java.net.Socket;
|
|
|
import java.net.SocketAddress;
|
|
|
import java.net.SocketException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
|
import org.apache.log4j.Logger;
|
|
@@ -43,6 +46,11 @@ import org.apache.zookeeper.server.RequestProcessor;
|
|
|
*/
|
|
|
public class Leader {
|
|
|
private static final Logger LOG = Logger.getLogger(Leader.class);
|
|
|
+
|
|
|
+ static final private boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true");
|
|
|
+ static {
|
|
|
+ LOG.info("TCP NoDelay set to: " + nodelay);
|
|
|
+ }
|
|
|
|
|
|
static public class Proposal {
|
|
|
public QuorumPacket packet;
|
|
@@ -191,7 +199,7 @@ public class Leader {
|
|
|
*/
|
|
|
final static int SYNC = 7;
|
|
|
|
|
|
- private ConcurrentLinkedQueue<Proposal> outstandingProposals = new ConcurrentLinkedQueue<Proposal>();
|
|
|
+ private ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
|
|
|
|
|
|
ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
|
|
|
|
|
@@ -207,7 +215,7 @@ public class Leader {
|
|
|
try{
|
|
|
Socket s = ss.accept();
|
|
|
s.setSoTimeout(self.tickTime * self.syncLimit);
|
|
|
- s.setTcpNoDelay(true);
|
|
|
+ s.setTcpNoDelay(nodelay);
|
|
|
new FollowerHandler(s, Leader.this);
|
|
|
} catch (SocketException e) {
|
|
|
if (stop) {
|
|
@@ -257,7 +265,7 @@ public class Leader {
|
|
|
LOG.info("NEWLEADER proposal has Zxid of "
|
|
|
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
|
|
|
}
|
|
|
- outstandingProposals.add(newLeaderProposal);
|
|
|
+ outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
|
|
|
|
|
|
// Start thread that waits for connection requests from
|
|
|
// new followers.
|
|
@@ -341,7 +349,9 @@ public class Leader {
|
|
|
LOG.info("Shutdown called",
|
|
|
new Exception("shutdown Leader! reason: " + reason));
|
|
|
|
|
|
- cnxAcceptor.halt();
|
|
|
+ if (cnxAcceptor != null) {
|
|
|
+ cnxAcceptor.halt();
|
|
|
+ }
|
|
|
|
|
|
// NIO should not accept conenctions
|
|
|
self.cnxnFactory.setZooKeeperServer(null);
|
|
@@ -380,7 +390,7 @@ public class Leader {
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Ack zxid: 0x" + Long.toHexString(zxid));
|
|
|
- for (Proposal p : outstandingProposals) {
|
|
|
+ for (Proposal p : outstandingProposals.values()) {
|
|
|
long packetZxid = p.packet.getZxid();
|
|
|
LOG.debug("outstanding proposal: 0x"
|
|
|
+ Long.toHexString(packetZxid));
|
|
@@ -394,57 +404,56 @@ public class Leader {
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- if (outstandingProposals.peek().packet.getZxid() > zxid) {
|
|
|
+ if (lastCommitted >= zxid) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("proposal has already been committed, pzxid:"
|
|
|
- + outstandingProposals.peek().packet.getZxid()
|
|
|
- + " zxid:" + zxid);
|
|
|
+ + lastCommitted
|
|
|
+ + " zxid: 0x" + Long.toHexString(zxid));
|
|
|
}
|
|
|
// The proposal has already been committed
|
|
|
return;
|
|
|
}
|
|
|
- for (Proposal p : outstandingProposals) {
|
|
|
- long packetZxid = p.packet.getZxid();
|
|
|
- if (packetZxid == zxid) {
|
|
|
- p.ackCount++;
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
|
|
|
- + " is " + p.ackCount);
|
|
|
- }
|
|
|
+ Proposal p = outstandingProposals.get(zxid);
|
|
|
+ if (p == null) {
|
|
|
+ LOG.warn("Trying to commit future proposal: zxid 0x"
|
|
|
+ + Long.toHexString(zxid) + " from " + followerAddr);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ p.ackCount++;
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
|
|
|
+ + " is " + p.ackCount);
|
|
|
+ }
|
|
|
|
|
|
- if (p.ackCount > self.quorumPeers.size() / 2){
|
|
|
- if (!first) {
|
|
|
- LOG.fatal("Commiting zxid 0x" + Long.toHexString(zxid)
|
|
|
- + " from " + followerAddr + " not first!");
|
|
|
- LOG.fatal("First is "
|
|
|
- + outstandingProposals.element().packet);
|
|
|
- System.exit(13);
|
|
|
- }
|
|
|
- outstandingProposals.remove();
|
|
|
- if (p.request != null) {
|
|
|
- toBeApplied.add(p);
|
|
|
- }
|
|
|
- // We don't commit the new leader proposal
|
|
|
- if ((zxid & 0xffffffffL) != 0) {
|
|
|
- if (p.request == null) {
|
|
|
- LOG.warn("Going to commmit null: " + p);
|
|
|
- }
|
|
|
- commit(zxid);
|
|
|
- zk.commitProcessor.commit(p.request);
|
|
|
- if(pendingSyncs.containsKey(zxid)){
|
|
|
- for(FollowerSyncRequest r: pendingSyncs.remove(zxid)) {
|
|
|
- sendSync(r);
|
|
|
- }
|
|
|
- }
|
|
|
+ if (p.ackCount > self.quorumPeers.size() / 2){
|
|
|
+ if (zxid != lastCommitted+1) {
|
|
|
+ LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
|
|
|
+ + " from " + followerAddr + " not first!");
|
|
|
+ LOG.warn("First is "
|
|
|
+ + (lastCommitted+1));
|
|
|
+ //System.exit(13);
|
|
|
+ }
|
|
|
+ outstandingProposals.remove(zxid);
|
|
|
+ if (p.request != null) {
|
|
|
+ toBeApplied.add(p);
|
|
|
+ }
|
|
|
+ // We don't commit the new leader proposal
|
|
|
+ if ((zxid & 0xffffffffL) != 0) {
|
|
|
+ if (p.request == null) {
|
|
|
+ LOG.warn("Going to commmit null: " + p);
|
|
|
}
|
|
|
+ commit(zxid);
|
|
|
+ zk.commitProcessor.commit(p.request);
|
|
|
+ if(pendingSyncs.containsKey(zxid)){
|
|
|
+ for(FollowerSyncRequest r: pendingSyncs.remove(zxid)) {
|
|
|
+ sendSync(r);
|
|
|
+ }
|
|
|
}
|
|
|
return;
|
|
|
} else {
|
|
|
- first = false;
|
|
|
+ lastCommitted = zxid;
|
|
|
}
|
|
|
}
|
|
|
- LOG.warn("Trying to commit future proposal: zxid 0x"
|
|
|
- + Long.toHexString(zxid) + " from " + followerAddr);
|
|
|
}
|
|
|
|
|
|
static class ToBeAppliedRequestProcessor implements RequestProcessor {
|
|
@@ -514,7 +523,7 @@ public class Leader {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- long lastCommitted;
|
|
|
+ long lastCommitted = -1;
|
|
|
|
|
|
/**
|
|
|
* Create a commit packet and send it to all the members of the quorum
|
|
@@ -537,7 +546,6 @@ public class Leader {
|
|
|
*/
|
|
|
public Proposal propose(Request request) {
|
|
|
|
|
|
-
|
|
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
|
|
|
try {
|
|
@@ -549,8 +557,8 @@ public class Leader {
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("This really should be impossible", e);
|
|
|
}
|
|
|
- QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos
|
|
|
- .toByteArray(), null);
|
|
|
+ QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
|
|
|
+ baos.toByteArray(), null);
|
|
|
|
|
|
Proposal p = new Proposal();
|
|
|
p.packet = pp;
|
|
@@ -560,8 +568,8 @@ public class Leader {
|
|
|
LOG.debug("Proposing:: " + request);
|
|
|
}
|
|
|
|
|
|
- outstandingProposals.add(p);
|
|
|
lastProposed = p.packet.getZxid();
|
|
|
+ outstandingProposals.put(lastProposed, p);
|
|
|
sendPacket(pp);
|
|
|
}
|
|
|
return p;
|
|
@@ -622,11 +630,13 @@ public class Leader {
|
|
|
.getZxid(), null, null);
|
|
|
handler.queuePacket(qp);
|
|
|
}
|
|
|
- for (Proposal p : outstandingProposals) {
|
|
|
- if (p.packet.getZxid() <= lastSeenZxid) {
|
|
|
+ List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet());
|
|
|
+ Collections.sort(zxids);
|
|
|
+ for (Long zxid: zxids) {
|
|
|
+ if (zxid <= lastSeenZxid) {
|
|
|
continue;
|
|
|
}
|
|
|
- handler.queuePacket(p.packet);
|
|
|
+ handler.queuePacket(outstandingProposals.get(zxid).packet);
|
|
|
}
|
|
|
}
|
|
|
synchronized (forwardingFollowers) {
|