Browse Source

ZOOKEEPER-3359: Batch commits in the CommitProcessor

Author: Brian Nixon <nixon@fb.com>

Reviewers: Michael Han <hanm@apache.org>, Norbert Kalmar <nkalmar@yahoo.com>, Enrico Olivelli <eolivelli@gmail.com>

Closes #905 from enixon/commit-proc-batch
Brian Nixon 6 years ago
parent
commit
a6b38f8321

+ 23 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -1445,6 +1445,29 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
     Number of Commit Processor worker threads. If configured with 0 worker threads, the main thread
     Number of Commit Processor worker threads. If configured with 0 worker threads, the main thread
     will process the request directly. The default value is the number of cpu cores.
     will process the request directly. The default value is the number of cpu cores.
 
 
+* *zookeeper.commitProcessor.maxReadBatchSize* :
+    (Java system property only: **zookeeper.commitProcessor.maxReadBatchSize**)
+    Max number of reads to process from queuedRequests before switching to processing commits.
+    If the value < 0 (default), we switch whenever we have a local write, and pending commits.
+    A high read batch size will delay commit processing, causing stale data to be served.
+    If reads are known to arrive in fixed size batches then matching that batch size with
+    the value of this property can smooth queue performance. Since reads are handled in parallel,
+    one recommendation is to set this property to match *zookeeper.commitProcessor.numWorkerThread*
+    (default is the number of cpu cores) or lower.
+
+* *zookeeper.commitProcessor.maxCommitBatchSize* :
+    (Java system property only: **zookeeper.commitProcessor.maxCommitBatchSize**)
+    Max number of commits to process before processing reads. We will try to process as many
+    remote/local commits as we can till we reach this count. A high commit batch size will delay
+    reads while processing more commits. A low commit batch size will favor reads.
+    It is recommended to only set this property when an ensemble is serving a workload with a high
+    commit rate. If writes are known to arrive in a set number of batches then matching that
+    batch size with the value of this property can smooth queue performance. A generic
+    approach would be to set this value to equal the ensemble size so that with the processing
+    of each batch the current server will probabilistically handle a write related to one of
+    its direct clients.
+    Default is "1". Negative and zero values are not supported.
+
 * *znode.container.checkIntervalMs* :
 * *znode.container.checkIntervalMs* :
     (Java system property only)
     (Java system property only)
     **New in 3.6.0:** The
     **New in 3.6.0:** The

+ 13 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java

@@ -23,6 +23,7 @@ import java.util.Date;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.zookeeper.Version;
 import org.apache.zookeeper.Version;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.quorum.CommitProcessor;
 
 
 /**
 /**
  * This class implements the ZooKeeper server MBean interface.
  * This class implements the ZooKeeper server MBean interface.
@@ -281,6 +282,18 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
 
 
     ///////////////////////////////////////////////////////////////////////////
     ///////////////////////////////////////////////////////////////////////////
 
 
+    public int getCommitProcMaxReadBatchSize() { return CommitProcessor.getMaxReadBatchSize(); }
+
+    public void setCommitProcMaxReadBatchSize(int size) { CommitProcessor.setMaxReadBatchSize(size); }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getCommitProcMaxCommitBatchSize() { return CommitProcessor.getMaxCommitBatchSize(); }
+
+    public void setCommitProcMaxCommitBatchSize(int size) { CommitProcessor.setMaxCommitBatchSize(size);}
+
+    ///////////////////////////////////////////////////////////////////////////
+
     @Override
     @Override
     public long getFlushDelay() {
     public long getFlushDelay() {
         return zks.getFlushDelay();
         return zks.getFlushDelay();

+ 6 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java

@@ -120,6 +120,12 @@ public interface ZooKeeperServerMXBean {
     public double getConnectionDecreaseRatio();
     public double getConnectionDecreaseRatio();
     public void setConnectionDecreaseRatio(double val);
     public void setConnectionDecreaseRatio(double val);
 
 
+    public int getCommitProcMaxReadBatchSize();
+    public void setCommitProcMaxReadBatchSize(int size);
+
+    public int getCommitProcMaxCommitBatchSize();
+    public void setCommitProcMaxCommitBatchSize(int size);
+
     public int getRequestThrottleLimit();
     public int getRequestThrottleLimit();
     public void setRequestThrottleLimit(int requests);
     public void setRequestThrottleLimit(int requests);
 
 

+ 217 - 101
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -19,11 +19,12 @@
 package org.apache.zookeeper.server.quorum;
 package org.apache.zookeeper.server.quorum;
 
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 
@@ -82,6 +83,12 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
     /** Default worker pool shutdown timeout in ms: 5000 (5s) */
     /** Default worker pool shutdown timeout in ms: 5000 (5s) */
     public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
     public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
         "zookeeper.commitProcessor.shutdownTimeout";
         "zookeeper.commitProcessor.shutdownTimeout";
+    /** Default max read batch size: -1 to disable the feature */
+    public static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE =
+        "zookeeper.commitProcessor.maxReadBatchSize";
+    /** Default max commit batch size: 1 */
+    public static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE =
+        "zookeeper.commitProcessor.maxCommitBatchSize";
 
 
     /**
     /**
      * Incoming requests.
      * Incoming requests.
@@ -89,6 +96,13 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
     protected LinkedBlockingQueue<Request> queuedRequests =
     protected LinkedBlockingQueue<Request> queuedRequests =
         new LinkedBlockingQueue<Request>();
         new LinkedBlockingQueue<Request>();
 
 
+    /**
+     * Incoming requests that are waiting on a commit,
+     * contained in order of arrival
+     */
+    protected final LinkedBlockingQueue<Request> queuedWriteRequests =
+            new LinkedBlockingQueue<>();
+
     /**
     /**
      * The number of read requests currently held in all session queues
      * The number of read requests currently held in all session queues
      */
      */
@@ -125,6 +139,23 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
     protected WorkerService workerPool;
     protected WorkerService workerPool;
     private Object emptyPoolSync = new Object();
     private Object emptyPoolSync = new Object();
 
 
+    /**
+     * Max number of reads to process from queuedRequests before switching to
+     * processing commits. If the value is negative, we switch whenever we have
+     * a local write, and pending commits.
+     * A high read batch size will delay commit processing causing us to
+     * serve stale data.
+     */
+    private static volatile int maxReadBatchSize;
+    /**
+     * Max number of commits to process before processing reads. We will try to
+     * process as many remote/local commits as we can till we reach this
+     * count.
+     * A high commit batch size will delay reads while processing more commits.
+     * A low commit batch size will favor reads.
+     */
+    private static volatile int maxCommitBatchSize;
+
     /**
     /**
      * This flag indicates whether we need to wait for a response to come back from the
      * This flag indicates whether we need to wait for a response to come back from the
      * leader or we just let the sync operation flow through like a read. The flag will
      * leader or we just let the sync operation flow through like a read. The flag will
@@ -209,22 +240,28 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
 
 
                 /*
                 /*
                  * Processing up to requestsToProcess requests from the incoming
                  * Processing up to requestsToProcess requests from the incoming
-                 * queue (queuedRequests), possibly less if a committed request
-                 * is present along with a pending local write. After the loop,
-                 * we process one committed request if commitIsWaiting.
+                 * queue (queuedRequests). If maxReadBatchSize is set then no
+                 * commits will be processed until maxReadBatchSize number of
+                 * reads are processed (or no more reads remain in the queue).
+                 * After the loop a single committed request is processed if
+                 * one is waiting (or a batch of commits if maxCommitBatchSize
+                 * is set).
                  */
                  */
-                Request request = null;
+                Request request;
+                int readsProcessed = 0;
                 while (!stopped && requestsToProcess > 0
                 while (!stopped && requestsToProcess > 0
+                        && (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize)
                         && (request = queuedRequests.poll()) != null) {
                         && (request = queuedRequests.poll()) != null) {
                     requestsToProcess--;
                     requestsToProcess--;
                     if (needCommit(request)
                     if (needCommit(request)
                             || pendingRequests.containsKey(request.sessionId)) {
                             || pendingRequests.containsKey(request.sessionId)) {
                         // Add request to pending
                         // Add request to pending
-                        pendingRequests
-                                .computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>())
-                                .add(request);
-                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(pendingRequests.get(request.sessionId).size());
+                        Deque<Request> requests =
+                                pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
+                        requests.addLast(request);
+                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
                     } else {
                     } else {
+                        readsProcessed++;
                         numReadQueuedRequests.decrementAndGet();
                         numReadQueuedRequests.decrementAndGet();
                         sendToNextProcessor(request);
                         sendToNextProcessor(request);
                     }
                     }
@@ -237,9 +274,10 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                      * the queue, so if we have a pending request and a
                      * the queue, so if we have a pending request and a
                      * committed request, the committed request must be for that
                      * committed request, the committed request must be for that
                      * pending write or for a write originating at a different
                      * pending write or for a write originating at a different
-                     * server.
+                     * server. We skip this if maxReadBatchSize is set.
                      */
                      */
-                    if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){
+                    if (maxReadBatchSize < 0 &&
+                        !pendingRequests.isEmpty() && !committedRequests.isEmpty()){
                         /*
                         /*
                          * We set commitIsWaiting so that we won't check
                          * We set commitIsWaiting so that we won't check
                          * committedRequests again.
                          * committedRequests again.
@@ -248,91 +286,111 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                         break;
                         break;
                     }
                     }
                 }
                 }
+                ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed);
 
 
-                // Handle a single committed request
-                if (commitIsWaiting && !stopped){
+                if (!commitIsWaiting) {
+                    commitIsWaiting = !committedRequests.isEmpty();
+                }
+
+                /*
+                 * Handle commits, if any.
+                 */
+                if (commitIsWaiting && !stopped) {
+                    /*
+                     * Drain outstanding reads
+                     */
                     waitForEmptyPool();
                     waitForEmptyPool();
 
 
-                    if (stopped){
+                    if (stopped) {
                         return;
                         return;
                     }
                     }
 
 
-                    // Process committed head
-                    if ((request = committedRequests.poll()) == null) {
-                        throw new IOException("Error: committed head is null");
-                    }
+                    int commitsToProcess = maxCommitBatchSize;
 
 
                     /*
                     /*
-                     * Check if request is pending, if so, update it with the committed info
+                     * Loop through all the commits, and try to drain them.
                      */
                      */
-                    Deque<Request> sessionQueue = pendingRequests
-                            .get(request.sessionId);
-                    ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
-                    if (sessionQueue != null) {
-                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
-                        // If session queue != null, then it is also not empty.
-                        Request topPending = sessionQueue.poll();
-                        if (request.cxid != topPending.cxid) {
-                            /*
-                             * TL;DR - we should not encounter this scenario often under normal load.
-                             * We pass the commit to the next processor and put the pending back with a warning.
-                             *
-                             * Generally, we can get commit requests that are not at the queue head after
-                             * a session moved (see ZOOKEEPER-2684). Let's denote the previous server of the session
-                             * with A, and the server that the session moved to with B (keep in mind that it is
-                             * possible that the session already moved from B to a new server C, and maybe C=A).
-                             * 1. If request.cxid < topPending.cxid : this means that the session requested this update
-                             * from A, then moved to B (i.e., which is us), and now B receives the commit
-                             * for the update after the session already performed several operations in B
-                             * (and therefore its cxid is higher than that old request).
-                             * 2. If request.cxid > topPending.cxid : this means that the session requested an updated
-                             * from B with cxid that is bigger than the one we know therefore in this case we
-                             * are A, and we lost the connection to the session. Given that we are waiting for a commit
-                             * for that update, it means that we already sent the request to the leader and it will
-                             * be committed at some point (in this case the order of cxid won't follow zxid, since zxid
-                             * is an increasing order). It is not safe for us to delete the session's queue at this
-                             * point, since it is possible that the session has newer requests in it after it moved
-                             * back to us. We just leave the queue as it is, and once the commit arrives (for the old
-                             * request), the finalRequestProcessor will see a closed cnxn handle, and just won't send a
-                             * response.
-                             * Also note that we don't have a local session, therefore we treat the request
-                             * like any other commit for a remote request, i.e., we perform the update without sending
-                             * a response.
-                             */
-                            LOG.warn("Got request " + request +
-                                    " but we are expecting request " + topPending);
-                            sessionQueue.addFirst(topPending);
-                        } else {
+                    Set<Long> queuesToDrain = new HashSet<>();
+                    long startWriteTime = Time.currentElapsedTime();
+                    int commitsProcessed = 0;
+                    while (commitIsWaiting && !stopped && commitsToProcess > 0) {
+
+                        // Process committed head
+                        request = committedRequests.peek();
+
+                        /*
+                         * Check if this is a local write request is pending,
+                         * if so, update it with the committed info. If the commit matches
+                         * the first write queued in the blockedRequestQueue, we know this is
+                         * a commit for a local write, as commits are received in order. Else
+                         * it must be a commit for a remote write.
+                         */
+                        if (!queuedWriteRequests.isEmpty() &&
+                                queuedWriteRequests.peek().sessionId == request.sessionId &&
+                                queuedWriteRequests.peek().cxid == request.cxid) {
                             /*
                             /*
-                             * Generally, we want to send to the next processor our version of the request,
-                             * since it contains the session information that is needed for post update processing.
-                             * In more details, when a request is in the local queue, there is (or could be) a client
-                             * attached to this server waiting for a response, and there is other bookkeeping of
-                             * requests that are outstanding and have originated from this server
-                             * (e.g., for setting the max outstanding requests) - we need to update this info when an
-                             * outstanding request completes. Note that in the other case (above), the operation
-                             * originated from a different server and there is no local bookkeeping or a local client
-                             * session that needs to be notified.
+                             * Commit matches the earliest write in our write queue.
                              */
                              */
-                            topPending.setHdr(request.getHdr());
-                            topPending.setTxn(request.getTxn());
-                            topPending.zxid = request.zxid;
-                            topPending.commitRecvTime = request.commitRecvTime;
-                            request = topPending;
-
-                            // Only decrement if we take a request off the queue.
-                            numWriteQueuedRequests.decrementAndGet();
+                            Deque<Request> sessionQueue = pendingRequests
+                                    .get(request.sessionId);
+                            ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
+                            if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
+                                /*
+                                 * Can't process this write yet.
+                                 * Either there are reads pending in this session, or we
+                                 * haven't gotten to this write yet.
+                                 */
+                                break;
+                            } else {
+                                ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
+                                // If session queue != null, then it is also not empty.
+                                Request topPending = sessionQueue.poll();
+                                /*
+                                 * Generally, we want to send to the next processor our version of the request,
+                                 * since it contains the session information that is needed for post update processing.
+                                 * In more details, when a request is in the local queue, there is (or could be) a client
+                                 * attached to this server waiting for a response, and there is other bookkeeping of
+                                 * requests that are outstanding and have originated from this server
+                                 * (e.g., for setting the max outstanding requests) - we need to update this info when an
+                                 * outstanding request completes. Note that in the other case, the operation
+                                 * originated from a different server and there is no local bookkeeping or a local client
+                                 * session that needs to be notified.
+                                 */
+                                topPending.setHdr(request.getHdr());
+                                topPending.setTxn(request.getTxn());
+                                topPending.zxid = request.zxid;
+                                topPending.commitRecvTime = request.commitRecvTime;
+                                request = topPending;
+                                // Only decrement if we take a request off the queue.
+                                numWriteQueuedRequests.decrementAndGet();
+                                queuedWriteRequests.poll();
+                                queuesToDrain.add(request.sessionId);
+                            }
                         }
                         }
-                    }
+                        /*
+                         * Pull the request off the commit queue, now that we are going
+                         * to process it.
+                         */
+                        committedRequests.remove();
+                        commitsToProcess--;
+                        commitsProcessed++;
 
 
-                    sendToNextProcessor(request);
-                    waitForEmptyPool();
+                        // Process the write inline.
+                        processWrite(request);
+
+                        commitIsWaiting = !committedRequests.isEmpty();
+                    }
+                    ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.add(
+                            Time.currentElapsedTime() - startWriteTime);
+                    ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed);
 
 
                     /*
                     /*
-                     * Process following reads if any, remove session queue if
+                     * Process following reads if any, remove session queue(s) if
                      * empty.
                      * empty.
                      */
                      */
-                    if (sessionQueue != null) {
+                    readsProcessed = 0;
+                    for (Long sessionId : queuesToDrain) {
+                        Deque<Request> sessionQueue = pendingRequests.get(sessionId);
                         int readsAfterWrite = 0;
                         int readsAfterWrite = 0;
                         while (!stopped && !sessionQueue.isEmpty()
                         while (!stopped && !sessionQueue.isEmpty()
                                 && !needCommit(sessionQueue.peek())) {
                                 && !needCommit(sessionQueue.peek())) {
@@ -341,12 +399,15 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                             readsAfterWrite++;
                             readsAfterWrite++;
                         }
                         }
                         ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
                         ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
+                        readsProcessed += readsAfterWrite;
 
 
                         // Remove empty queues
                         // Remove empty queues
                         if (sessionQueue.isEmpty()) {
                         if (sessionQueue.isEmpty()) {
-                            pendingRequests.remove(request.sessionId);
+                            pendingRequests.remove(sessionId);
                         }
                         }
                     }
                     }
+                    ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
+                    ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
                 }
                 }
 
 
                 ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
                 ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
@@ -388,6 +449,8 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         workerShutdownTimeoutMS = Long.getLong(
         workerShutdownTimeoutMS = Long.getLong(
             ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
             ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
 
 
+        initBatchSizes();
+
         LOG.info("Configuring CommitProcessor with "
         LOG.info("Configuring CommitProcessor with "
                  + (numWorkerThreads > 0 ? numWorkerThreads : "no")
                  + (numWorkerThreads > 0 ? numWorkerThreads : "no")
                  + " worker threads.");
                  + " worker threads.");
@@ -409,6 +472,78 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
         workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
     }
     }
 
 
+    private void processWrite(Request request) throws RequestProcessorException {
+        processCommitMetrics(request, true);
+
+        long timeBeforeFinalProc = Time.currentElapsedTime();
+        nextProcessor.processRequest(request);
+        ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(
+                Time.currentElapsedTime() - timeBeforeFinalProc);
+    }
+
+    private static void initBatchSizes() {
+        maxReadBatchSize = Integer.getInteger(
+                ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE, -1);
+        maxCommitBatchSize = Integer.getInteger(
+                ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE, 1);
+
+        if (maxCommitBatchSize <= 0) {
+            String errorMsg = "maxCommitBatchSize must be positive, was " +
+                    maxCommitBatchSize;
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        LOG.info("Configuring CommitProcessor with readBatchSize {} commitBatchSize {}",
+                maxReadBatchSize,
+                maxCommitBatchSize);
+    }
+
+    private static void processCommitMetrics(Request request, boolean isWrite) {
+        if (isWrite) {
+            if (request.commitProcQueueStartTime != -1 &&
+                    request.commitRecvTime != -1) {
+                // Locally issued writes.
+                long currentTime = Time.currentElapsedTime();
+                ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime -
+                        request.commitProcQueueStartTime);
+                ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime -
+                        request.commitRecvTime);
+            } else if (request.commitRecvTime != -1) {
+                // Writes issued by other servers.
+                ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(
+                        Time.currentElapsedTime() - request.commitRecvTime);
+            }
+        } else {
+            if (request.commitProcQueueStartTime != -1) {
+                ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(
+                        Time.currentElapsedTime() -
+                                request.commitProcQueueStartTime);
+            }
+        }
+    }
+
+    public static int getMaxReadBatchSize() {
+        return maxReadBatchSize;
+    }
+
+    public static int getMaxCommitBatchSize() {
+        return maxCommitBatchSize;
+    }
+
+    public static void setMaxReadBatchSize(int size) {
+        maxReadBatchSize = size;
+        LOG.info("Configuring CommitProcessor with readBatchSize {}",
+                 maxReadBatchSize);
+    }
+
+    public static void setMaxCommitBatchSize(int size) {
+        if (size > 0) {
+            maxCommitBatchSize = size;
+            LOG.info("Configuring CommitProcessor with commitBatchSize {}",
+                     maxCommitBatchSize);
+        }
+    }
+
     /**
     /**
      * CommitWorkRequest is a small wrapper class to allow
      * CommitWorkRequest is a small wrapper class to allow
      * downstream processing to be run using the WorkerService
      * downstream processing to be run using the WorkerService
@@ -431,27 +566,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
 
 
         public void doWork() throws RequestProcessorException {
         public void doWork() throws RequestProcessorException {
             try {
             try {
-                if (needCommit(request)) {
-                    if (request.commitProcQueueStartTime != -1 &&
-                            request.commitRecvTime != -1) {
-                        // Locally issued writes.
-                        long currentTime = Time.currentElapsedTime();
-                        ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime -
-                                request.commitProcQueueStartTime);
-                        ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime -
-                                request.commitRecvTime);
-                    } else if (request.commitRecvTime != -1) {
-                        // Writes issued by other servers.
-                        ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(
-                                Time.currentElapsedTime() - request.commitRecvTime);
-                    }
-                } else {
-                    if (request.commitProcQueueStartTime != -1) {
-                        ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(
-                                Time.currentElapsedTime() -
-                                        request.commitProcQueueStartTime);
-                    }
-                }
+                processCommitMetrics(request, needCommit(request));
 
 
                 long timeBeforeFinalProc = Time.currentElapsedTime();
                 long timeBeforeFinalProc = Time.currentElapsedTime();
                 nextProcessor.processRequest(request);
                 nextProcessor.processRequest(request);
@@ -508,6 +623,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         queuedRequests.add(request);
         queuedRequests.add(request);
         // If the request will block, add it to the queue of blocking requests
         // If the request will block, add it to the queue of blocking requests
         if (needCommit(request)) {
         if (needCommit(request)) {
+            queuedWriteRequests.add(request);
             numWriteQueuedRequests.incrementAndGet();
             numWriteQueuedRequests.incrementAndGet();
         } else {
         } else {
             numReadQueuedRequests.incrementAndGet();
             numReadQueuedRequests.incrementAndGet();

+ 159 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java

@@ -62,6 +62,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
     public void setUp() throws Exception {
     public void setUp() throws Exception {
         processedRequests = new LinkedBlockingQueue<Request>();
         processedRequests = new LinkedBlockingQueue<Request>();
         processor = new MockCommitProcessor();
         processor = new MockCommitProcessor();
+        CommitProcessor.setMaxReadBatchSize(-1);
+        CommitProcessor.setMaxCommitBatchSize(1);
     }
     }
 
 
     @After
     @After
@@ -148,6 +150,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
         processor.committedRequests.add(writeReq);
         processor.committedRequests.add(writeReq);
         processor.queuedRequests.add(readReq);
         processor.queuedRequests.add(readReq);
         processor.queuedRequests.add(writeReq);
         processor.queuedRequests.add(writeReq);
+        processor.queuedWriteRequests.add(writeReq);
         processor.initThreads(1);
         processor.initThreads(1);
 
 
         processor.stoppedMainLoop = true;
         processor.stoppedMainLoop = true;
@@ -194,6 +197,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
             Request readReq = newRequest(new GetDataRequest(path, false),
             Request readReq = newRequest(new GetDataRequest(path, false),
                     OpCode.getData, sessionId, sessionId + 2);
                     OpCode.getData, sessionId, sessionId + 2);
             processor.queuedRequests.add(writeReq);
             processor.queuedRequests.add(writeReq);
+            processor.queuedWriteRequests.add(writeReq);
             processor.queuedRequests.add(readReq);
             processor.queuedRequests.add(readReq);
             shouldNotBeProcessed.add(writeReq);
             shouldNotBeProcessed.add(writeReq);
             shouldNotBeProcessed.add(readReq);
             shouldNotBeProcessed.add(readReq);
@@ -232,6 +236,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                 OpCode.create, 0x1, 1);
                 OpCode.create, 0x1, 1);
         processor.queuedRequests.add(writeReq);
         processor.queuedRequests.add(writeReq);
+        processor.queuedWriteRequests.add(writeReq);
         shouldBeInPending.add(writeReq);
         shouldBeInPending.add(writeReq);
 
 
         for (int readReqId = 2; readReqId <= 5; ++readReqId) {
         for (int readReqId = 2; readReqId <= 5; ++readReqId) {
@@ -249,6 +254,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                 processedRequests.isEmpty());
                 processedRequests.isEmpty());
         Assert.assertTrue("Did not handled all of queuedRequests' requests",
         Assert.assertTrue("Did not handled all of queuedRequests' requests",
                 processor.queuedRequests.isEmpty());
                 processor.queuedRequests.isEmpty());
+        Assert.assertTrue("Removed from blockedQueuedRequests before commit",
+                !processor.queuedWriteRequests.isEmpty());
 
 
         shouldBeInPending
         shouldBeInPending
                 .removeAll(processor.pendingRequests.get(writeReq.sessionId));
                 .removeAll(processor.pendingRequests.get(writeReq.sessionId));
@@ -273,6 +280,155 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                 processor.committedRequests.isEmpty());
                 processor.committedRequests.isEmpty());
         Assert.assertTrue("Did not process committed request",
         Assert.assertTrue("Did not process committed request",
                 processor.pendingRequests.isEmpty());
                 processor.pendingRequests.isEmpty());
+        Assert.assertTrue("Did not remove from blockedQueuedRequests",
+                processor.queuedWriteRequests.isEmpty());
+    }
+
+    /**
+     * In the following test, we add a write request followed by several read
+     * requests of the same session. We will do this for 2 sessions. For the
+     * second session, we will queue up another write after the reads, and
+     * we verify several things - 1. The writes are not processed until
+     * the commits arrive. 2. Only 2 writes are processed, with maxCommitBatchSize
+     * of 3, due to the blocking reads. 3. Once the writes are processed,
+     * all the read requests are processed as well. 4. All read requests are
+     * executed after the write, before any other write for that session,
+     * along with new reads. 5. Then we add another read for session 1, and
+     * another write and commit for session 2. 6. Only the old write, and the read
+     * are processed, leaving the commit in the queue. 7. Last write is executed
+     * in the last iteration, and all lists are empty.
+     */
+    @Test
+    public void processAllWritesMaxBatchSize()
+            throws Exception {
+        final String path = "/processAllWritesMaxBatchSize";
+        HashSet<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
+
+        Request writeReq = newRequest(
+                new CreateRequest(path + "_1", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x1, 1);
+        processor.queuedRequests.add(writeReq);
+        processor.queuedWriteRequests.add(writeReq);
+
+        Request writeReq2 = newRequest(
+                new CreateRequest(path + "_2", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x2, 1);
+        processor.queuedRequests.add(writeReq2);
+        processor.queuedWriteRequests.add(writeReq2);
+
+        for (int readReqId = 2; readReqId <= 5; ++readReqId) {
+            Request readReq = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, 0x1, readReqId);
+            Request readReq2 = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, 0x2, readReqId);
+            processor.queuedRequests.add(readReq);
+            shouldBeProcessedAfterPending.add(readReq);
+            processor.queuedRequests.add(readReq2);
+            shouldBeProcessedAfterPending.add(readReq2);
+        }
+
+        Request writeReq3 = newRequest(
+                new CreateRequest(path + "_3", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x2, 6);
+        processor.queuedRequests.add(writeReq3);
+        processor.queuedWriteRequests.add(writeReq3);
+
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        processor.stoppedMainLoop = true;
+        CommitProcessor.setMaxCommitBatchSize(2);
+        processor.run();
+        Assert.assertTrue("Processed without waiting for commit",
+                processedRequests.isEmpty());
+        Assert.assertTrue("Did not handled all of queuedRequests' requests",
+                processor.queuedRequests.isEmpty());
+        Assert.assertTrue("Removed from blockedQueuedRequests before commit",
+                !processor.queuedWriteRequests.isEmpty());
+        Assert.assertTrue("Missing session 1 in pending queue",
+                processor.pendingRequests.containsKey(writeReq.sessionId));
+        Assert.assertTrue("Missing session 2 in pending queue",
+                processor.pendingRequests.containsKey(writeReq2.sessionId));
+
+        processor.committedRequests.add(writeReq);
+        processor.committedRequests.add(writeReq2);
+        processor.committedRequests.add(writeReq3);
+        processor.stoppedMainLoop = true;
+        CommitProcessor.setMaxCommitBatchSize(3);
+        processor.run();
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        Thread.sleep(500);
+        Assert.assertTrue("Did not process committed request",
+                processedRequests.peek() == writeReq);
+        Assert.assertTrue("Did not process following read request",
+                processedRequests.containsAll(shouldBeProcessedAfterPending));
+        Assert.assertTrue("Processed committed request",
+                !processor.committedRequests.isEmpty());
+        Assert.assertTrue("Removed commit for write req 3",
+                processor.committedRequests.peek() == writeReq3);
+        Assert.assertTrue("Processed committed request",
+                !processor.pendingRequests.isEmpty());
+        Assert.assertTrue("Missing session 2 in pending queue",
+                processor.pendingRequests.containsKey(writeReq3.sessionId));
+        Assert.assertTrue("Missing write 3 in pending queue",
+                processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3);
+        Assert.assertTrue("Removed from blockedQueuedRequests",
+                !processor.queuedWriteRequests.isEmpty());
+        Assert.assertTrue("Removed write req 3 from blockedQueuedRequests",
+                processor.queuedWriteRequests.peek() == writeReq3);
+
+        Request readReq3 = newRequest(new GetDataRequest(path, false),
+                OpCode.getData, 0x1, 7);
+        processor.queuedRequests.add(readReq3);
+        shouldBeProcessedAfterPending.add(readReq3);
+        Request writeReq4 = newRequest(
+                new CreateRequest(path + "_4", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x2, 7);
+
+        processor.queuedRequests.add(writeReq4);
+        processor.queuedWriteRequests.add(writeReq4);
+        processor.committedRequests.add(writeReq4);
+
+        processor.stoppedMainLoop = true;
+        CommitProcessor.setMaxCommitBatchSize(3);
+        processor.run();
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        Thread.sleep(500);
+        Assert.assertTrue("Did not process committed request",
+                processedRequests.peek() == writeReq);
+        Assert.assertTrue("Did not process following read request",
+                processedRequests.containsAll(shouldBeProcessedAfterPending));
+        Assert.assertTrue("Processed unexpected committed request",
+                !processor.committedRequests.isEmpty());
+        Assert.assertTrue("Unexpected pending request",
+                processor.pendingRequests.isEmpty());
+        Assert.assertTrue("Removed from blockedQueuedRequests",
+                !processor.queuedWriteRequests.isEmpty());
+        Assert.assertTrue("Removed write req 4 from blockedQueuedRequests",
+                processor.queuedWriteRequests.peek() == writeReq4);
+
+        processor.stoppedMainLoop = true;
+        CommitProcessor.setMaxCommitBatchSize(3);
+        processor.run();
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        Thread.sleep(500);
+        Assert.assertTrue("Did not process committed request",
+                processedRequests.peek() == writeReq);
+        Assert.assertTrue("Did not process following read request",
+                processedRequests.containsAll(shouldBeProcessedAfterPending));
+        Assert.assertTrue("Did not process committed request",
+                processor.committedRequests.isEmpty());
+        Assert.assertTrue("Did not process committed request",
+                processor.pendingRequests.isEmpty());
+        Assert.assertTrue("Did not remove from blockedQueuedRequests",
+                processor.queuedWriteRequests.isEmpty());
+
     }
     }
 
 
     /**
     /**
@@ -322,6 +478,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                 OpCode.create, 0x3, 1);
                 OpCode.create, 0x3, 1);
         processor.queuedRequests.add(firstCommittedReq);
         processor.queuedRequests.add(firstCommittedReq);
+        processor.queuedWriteRequests.add(firstCommittedReq);
         processor.committedRequests.add(firstCommittedReq);
         processor.committedRequests.add(firstCommittedReq);
         Set<Request> allReads = new HashSet<Request>();
         Set<Request> allReads = new HashSet<Request>();
 
 
@@ -399,6 +556,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                 OpCode.create, sessionid, readReqId++);
                 OpCode.create, sessionid, readReqId++);
         processor.queuedRequests.add(firstCommittedReq);
         processor.queuedRequests.add(firstCommittedReq);
+        processor.queuedWriteRequests.add(firstCommittedReq);
         localRequests.add(firstCommittedReq);
         localRequests.add(firstCommittedReq);
 
 
         // queue read requests to queuedRequests
         // queue read requests to queuedRequests
@@ -463,6 +621,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                 OpCode.create, sessionid, lastCXid);
                 OpCode.create, sessionid, lastCXid);
         processor.queuedRequests.add(orphanCommittedReq);
         processor.queuedRequests.add(orphanCommittedReq);
+        processor.queuedWriteRequests.add(orphanCommittedReq);
         localRequests.add(orphanCommittedReq);
         localRequests.add(orphanCommittedReq);
 
 
         // queue read requests to queuedRequests
         // queue read requests to queuedRequests

+ 4 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java

@@ -51,6 +51,10 @@ public class CommitProcessorMetricsTest extends ZKTestCase {
     public void setup() {
     public void setup() {
         LOG.info("setup");
         LOG.info("setup");
         ServerMetrics.getMetrics().resetAll();
         ServerMetrics.getMetrics().resetAll();
+
+        // ensure no leaked parallelism properties
+        System.clearProperty("zookeeper.commitProcessor.maxReadBatchSize");
+        System.clearProperty("zookeeper.commitProcessor.maxCommitBatchSize");
     }
     }
 
 
     public void setupProcessors(int commitWorkers, int finalProcTime ) {
     public void setupProcessors(int commitWorkers, int finalProcTime ) {