|
@@ -95,6 +95,7 @@ public class Learner {
|
|
|
return sock;
|
|
|
}
|
|
|
|
|
|
+ LearnerSender sender = null;
|
|
|
protected InputArchive leaderIs;
|
|
|
protected OutputArchive leaderOs;
|
|
|
/** the protocol version of the leader */
|
|
@@ -113,9 +114,12 @@ public class Learner {
|
|
|
|
|
|
private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
|
|
|
|
|
|
+ public static final String LEARNER_ASYNC_SENDING = "learner.asyncSending";
|
|
|
+ private static boolean asyncSending = Boolean.getBoolean(LEARNER_ASYNC_SENDING);
|
|
|
static {
|
|
|
LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
|
|
|
LOG.info("TCP NoDelay set to: {}", nodelay);
|
|
|
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
|
|
|
}
|
|
|
|
|
|
final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
|
|
@@ -124,6 +128,15 @@ public class Learner {
|
|
|
return pendingRevalidations.size();
|
|
|
}
|
|
|
|
|
|
+ // for testing
|
|
|
+ protected static void setAsyncSending(boolean newMode) {
|
|
|
+ asyncSending = newMode;
|
|
|
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
|
|
|
+
|
|
|
+ }
|
|
|
+ protected static boolean getAsyncSending() {
|
|
|
+ return asyncSending;
|
|
|
+ }
|
|
|
/**
|
|
|
* validate a session for a client
|
|
|
*
|
|
@@ -152,13 +165,27 @@ public class Learner {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * write a packet to the leader
|
|
|
+ * write a packet to the leader.
|
|
|
+ *
|
|
|
+ * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
|
|
|
+ * When packets are sent synchronously, writing is done within a synchronization block.
|
|
|
+ * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
|
|
|
+ * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
|
|
|
+ * So we have only one thread writing to leaderOs at a time in either case.
|
|
|
*
|
|
|
* @param pp
|
|
|
* the proposal packet to be sent to the leader
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
|
|
|
+ if (asyncSending) {
|
|
|
+ sender.queuePacket(pp);
|
|
|
+ } else {
|
|
|
+ writePacketNow(pp, flush);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
|
|
|
synchronized (leaderOs) {
|
|
|
if (pp != null) {
|
|
|
messageTracker.trackSent(pp.getType());
|
|
@@ -170,6 +197,14 @@ public class Learner {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Start thread that will forward any packet in the queue to the leader
|
|
|
+ */
|
|
|
+ protected void startSendingThread() {
|
|
|
+ sender = new LearnerSender(this);
|
|
|
+ sender.start();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* read a packet from the leader
|
|
|
*
|
|
@@ -303,6 +338,9 @@ public class Learner {
|
|
|
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
|
|
|
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
|
|
|
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
|
|
|
+ if (asyncSending) {
|
|
|
+ startSendingThread();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
class LeaderConnector implements Runnable {
|
|
@@ -774,8 +812,9 @@ public class Learner {
|
|
|
dos.writeLong(entry.getKey());
|
|
|
dos.writeInt(entry.getValue());
|
|
|
}
|
|
|
- qp.setData(bos.toByteArray());
|
|
|
- writePacket(qp, true);
|
|
|
+
|
|
|
+ QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
|
|
|
+ writePacket(pingReply, true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -785,6 +824,11 @@ public class Learner {
|
|
|
self.setZooKeeperServer(null);
|
|
|
self.closeAllConnections();
|
|
|
self.adminServer.setZooKeeperServer(null);
|
|
|
+
|
|
|
+ if (sender != null) {
|
|
|
+ sender.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
closeSocket();
|
|
|
// shutdown previous zookeeper
|
|
|
if (zk != null) {
|