Browse Source

ZOOKEEPER-1505. Multi-thread CommitProcessor (Jay Shrauner via phunt)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1418555 13f79535-47bb-0310-9956-ffa450edef68
Patrick D. Hunt 12 years ago
parent
commit
e8d3e1d74e

+ 3 - 0
CHANGES.txt

@@ -403,6 +403,9 @@ IMPROVEMENTS:
   ZOOKEEPER-1238. Linger time should be -1 for Netty sockets. (Skye
   W-M via henryr)
 
+  ZOOKEEPER-1505. Multi-thread CommitProcessor (Jay Shrauner via phunt)
+
+
 Release 3.4.0 - 
 
 Non-backward compatible changes:

+ 13 - 13
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -95,22 +95,22 @@ public class FinalRequestProcessor implements RequestProcessor {
         }
         ProcessTxnResult rc = null;
         synchronized (zks.outstandingChanges) {
-            while (!zks.outstandingChanges.isEmpty()
-                    && zks.outstandingChanges.get(0).zxid <= request.zxid) {
-                ChangeRecord cr = zks.outstandingChanges.remove(0);
-                if (cr.zxid < request.zxid) {
-                    LOG.warn("Zxid outstanding "
-                            + cr.zxid
-                            + " is less than current " + request.zxid);
-                }
-                if (zks.outstandingChangesForPath.get(cr.path) == cr) {
-                    zks.outstandingChangesForPath.remove(cr.path);
-                }
-            }
             if (request.getHdr() != null) {
                 TxnHeader hdr = request.getHdr();
                 Record txn = request.getTxn();
-                
+                long zxid = hdr.getZxid();
+                while (!zks.outstandingChanges.isEmpty()
+                       && zks.outstandingChanges.get(0).zxid <= zxid) {
+                    ChangeRecord cr = zks.outstandingChanges.remove(0);
+                    if (cr.zxid < zxid) {
+                        LOG.warn("Zxid outstanding " + cr.zxid
+                                 + " is less than current " + zxid);
+                    }
+                    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
+                        zks.outstandingChangesForPath.remove(cr.path);
+                    }
+                }
+
                 rc = zks.processTxn(hdr, txn);
             }
             // do not add non quorum packets to the queue.

+ 3 - 2
src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java

@@ -21,8 +21,8 @@ package org.apache.zookeeper.server;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
@@ -128,7 +128,8 @@ public abstract class ServerCnxnFactory {
 
     public abstract InetSocketAddress getLocalAddress();
 
-    private final HashMap<ServerCnxn, ConnectionBean> connectionBeans = new HashMap<ServerCnxn, ConnectionBean>();
+    private final ConcurrentHashMap<ServerCnxn, ConnectionBean> connectionBeans =
+        new ConcurrentHashMap<ServerCnxn, ConnectionBean>();
 
     protected final HashSet<ServerCnxn> cnxns = new HashSet<ServerCnxn>();
     public void unregisterConnection(ServerCnxn serverCnxn) {

+ 240 - 0
src/java/main/org/apache/zookeeper/server/WorkerService.java

@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * WorkerService is a worker thread pool for running tasks and is implemented
+ * using one or more ExecutorServices. A WorkerService can support assignable
+ * threads, which it does by creating N separate single thread ExecutorServices,
+ * or non-assignable threads, which it does by creating a single N-thread
+ * ExecutorService.
+ *   - NIOServerCnxnFactory uses a non-assignable WorkerService because the
+ *     socket IO requests are order independent and allowing the
+ *     ExecutorService to handle thread assignment gives optimal performance.
+ *   - CommitProcessor uses an assignable WorkerService because requests for
+ *     a given session must be processed in order.
+ * ExecutorService provides queue management and thread restarting, so it's
+ * useful even with a single thread.
+ */
+public class WorkerService {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(WorkerService.class);
+
+    private final ArrayList<ExecutorService> workers =
+        new ArrayList<ExecutorService>();
+
+    private final String threadNamePrefix;
+    private int numWorkerThreads;
+    private boolean threadsAreAssignable;
+    private long shutdownTimeoutMS = 5000;
+
+    private volatile boolean stopped = true;
+
+    /**
+     * @param name                  worker threads are named <name>Thread-##
+     * @param numThreads            number of worker threads (0 - N)
+     *                              If 0, scheduled work is run immediately by
+     *                              the calling thread.
+     * @param useAssignableThreads  whether the worker threads should be
+     *                              individually assignable or not
+     */
+    public WorkerService(String name, int numThreads,
+                         boolean useAssignableThreads) {
+        this.threadNamePrefix = (name == null ? "" : name) + "Thread";
+        this.numWorkerThreads = numThreads;
+        this.threadsAreAssignable = useAssignableThreads;
+        start();
+    }
+
+    /**
+     * Callers should implement a class extending WorkRequest in order to
+     * schedule work with the service.
+     */
+    public static abstract class WorkRequest {
+        /**
+         * Must be implemented. Is called when the work request is run.
+         */
+        public abstract void doWork() throws Exception;
+
+        /**
+         * (Optional) If implemented, is called if the service is stopped
+         * or unable to schedule the request.
+         */
+        public void cleanup() {
+        }
+    }
+
+    /**
+     * Schedule work to be done.  If a worker thread pool is not being
+     * used, work is done directly by this thread. This schedule API is
+     * for use with non-assignable WorkerServices. For assignable
+     * WorkerServices, will always run on the first thread.
+     */
+    public void schedule(WorkRequest workRequest) {
+        schedule(workRequest, 0);
+    }
+
+    /**
+     * Schedule work to be done by the thread assigned to this id. Thread
+     * assignment is a single mod operation on the number of threads.  If a
+     * worker thread pool is not being used, work is done directly by
+     * this thread.
+     */
+    public void schedule(WorkRequest workRequest, long id) {
+        if (stopped) {
+            workRequest.cleanup();
+            return;
+        }
+
+        ScheduledWorkRequest scheduledWorkRequest =
+            new ScheduledWorkRequest(workRequest);
+
+        // If we have a worker thread pool, use that; otherwise, do the work
+        // directly.
+        int size = workers.size();
+        if (size > 0) {
+            try {
+                // make sure to map negative ids as well to [0, size-1]
+                int workerNum = ((int) (id % size) + size) % size;
+                ExecutorService worker = workers.get(workerNum);
+                worker.execute(scheduledWorkRequest);
+            } catch (RejectedExecutionException e) {
+                LOG.warn("ExecutorService rejected execution", e);
+                workRequest.cleanup();
+            }
+        } else {
+            scheduledWorkRequest.run();
+        }
+    }
+
+    private class ScheduledWorkRequest implements Runnable {
+        private final WorkRequest workRequest;
+
+        ScheduledWorkRequest(WorkRequest workRequest) {
+            this.workRequest = workRequest;
+        }
+
+        @Override
+        public void run() {
+            try {
+                // Check if stopped while request was on queue
+                if (stopped) {
+                    workRequest.cleanup();
+                    return;
+                }
+                workRequest.doWork();
+            } catch (Exception e) {
+                LOG.warn("Unexpected exception", e);
+                workRequest.cleanup();
+            }
+        }
+    }
+
+    /**
+     * ThreadFactory for the worker thread pool. We don't use the default
+     * thread factory because (1) we want to give the worker threads easier
+     * to identify names; and (2) we want to make the worker threads daemon
+     * threads so they don't block the server from shutting down.
+     */
+    private static class DaemonThreadFactory implements ThreadFactory {
+        final ThreadGroup group;
+        final AtomicInteger threadNumber = new AtomicInteger(1);
+        final String namePrefix;
+
+        DaemonThreadFactory(String name) {
+            this(name, 1);
+        }
+
+        DaemonThreadFactory(String name, int firstThreadNum) {
+            threadNumber.set(firstThreadNum);
+            SecurityManager s = System.getSecurityManager();
+            group = (s != null)? s.getThreadGroup() :
+                                 Thread.currentThread().getThreadGroup();
+            namePrefix = name + "-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(group, r,
+                                  namePrefix + threadNumber.getAndIncrement(),
+                                  0);
+            if (!t.isDaemon())
+                t.setDaemon(true);
+            if (t.getPriority() != Thread.NORM_PRIORITY)
+                t.setPriority(Thread.NORM_PRIORITY);
+            return t;
+        }
+    }
+
+    public void start() {
+        if (numWorkerThreads > 0) {
+            if (threadsAreAssignable) {
+                for(int i = 1; i <= numWorkerThreads; ++i) {
+                    workers.add(Executors.newFixedThreadPool(
+                        1, new DaemonThreadFactory(threadNamePrefix, i)));
+                }
+            } else {
+                workers.add(Executors.newFixedThreadPool(
+                    numWorkerThreads, new DaemonThreadFactory(threadNamePrefix)));
+            }
+        }
+        stopped = false;
+    }
+
+    public void stop() {
+        stopped = true;
+
+        // Signal for graceful shutdown
+        for(ExecutorService worker : workers) {
+            worker.shutdown();
+        }
+    }
+
+    public void join(long shutdownTimeoutMS) {
+        // Give the worker threads time to finish executing
+        long now = System.currentTimeMillis();
+        long endTime = now + shutdownTimeoutMS;
+        for(ExecutorService worker : workers) {
+            boolean terminated = false;
+            while ((now = System.currentTimeMillis()) <= endTime) {
+                try {
+                    terminated = worker.awaitTermination(
+                        endTime - now, TimeUnit.MILLISECONDS);
+                    break;
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            if (!terminated) {
+                // If we've timed out, do a hard shutdown
+                worker.shutdownNow();
+            }
+        }
+    }
+}

+ 246 - 86
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -18,36 +18,88 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.WorkerService;
 
 /**
  * This RequestProcessor matches the incoming committed requests with the
  * locally submitted requests. The trick is that locally submitted requests that
  * change the state of the system will come back as incoming committed requests,
  * so we need to match them up.
+ *
+ * The CommitProcessor is multi-threaded. Communication between threads is
+ * handled via queues, atomics, and wait/notifyAll synchronized on the
+ * processor. The CommitProcessor acts as a gateway for allowing requests to
+ * continue with the remainder of the processing pipeline. It will allow many
+ * read requests but only a single write request to be in flight simultaneously,
+ * thus ensuring that write requests are processed in transaction id order.
+ *
+ *   - 1   commit processor main thread, which watches the request queues and
+ *         assigns requests to worker threads based on their sessionId so that
+ *         read and write requests for a particular session are always assigned
+ *         to the same thread (and hence are guaranteed to run in order).
+ *   - 0-N worker threads, which run the rest of the request processor pipeline
+ *         on the requests. If configured with 0 worker threads, the primary
+ *         commit processor thread runs the pipeline directly.
+ *
+ * Typical (default) thread counts are: on a 32 core machine, 1 commit
+ * processor thread and 32 worker threads.
+ *
+ * Multi-threading constraints:
+ *   - Each session's requests must be processed in order.
+ *   - Write requests must be processed in zxid order
+ *   - Must ensure no race condition between writes in one session that would
+ *     trigger a watch being set by a read request in another session
+ *
+ * The current implementation solves the third constraint by simply allowing no
+ * read requests to be processed in parallel with write requests.
  */
 public class CommitProcessor extends Thread implements RequestProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
 
+    /** Default: numCores */
+    public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS =
+        "zookeeper.commitProcessor.numWorkerThreads";
+    /** Default worker pool shutdown timeout in ms: 5000 (5s) */
+    public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
+        "zookeeper.commitProcessor.shutdownTimeout";
+
     /**
      * Requests that we are holding until the commit comes in.
      */
-    LinkedList<Request> queuedRequests = new LinkedList<Request>();
+    private final LinkedBlockingQueue<Request> queuedRequests =
+        new LinkedBlockingQueue<Request>();
 
     /**
      * Requests that have been committed.
      */
-    LinkedList<Request> committedRequests = new LinkedList<Request>();
+    private final LinkedBlockingQueue<Request> committedRequests =
+        new LinkedBlockingQueue<Request>();
+
+    /** Request for which we are currently awaiting a commit */
+    private final AtomicReference<Request> nextPending =
+        new AtomicReference<Request>();
+    /** Request currently being committed (ie, sent off to next processor) */
+    private final AtomicReference<Request> currentlyCommitting =
+        new AtomicReference<Request>();
+
+    /** The number of requests currently being processed */
+    private AtomicInteger numRequestsProcessing = new AtomicInteger(0);
 
     RequestProcessor nextProcessor;
 
+    private volatile boolean stopped = true;
+    private long workerShutdownTimeoutMS;
+    private WorkerService workerPool;
+
     /**
      * This flag indicates whether we need to wait for a response to come back from the
      * leader or we just let the sync operation flow through like a read. The flag will
@@ -55,87 +107,107 @@ public class CommitProcessor extends Thread implements RequestProcessor {
      */
     boolean matchSyncs;
 
-    public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs) {
+    public CommitProcessor(RequestProcessor nextProcessor, String id,
+                           boolean matchSyncs) {
         super("CommitProcessor:" + id);
         this.nextProcessor = nextProcessor;
         this.matchSyncs = matchSyncs;
     }
 
-    volatile boolean finished = false;
+    private boolean isProcessingRequest() {
+        return numRequestsProcessing.get() != 0;
+    }
+
+    private boolean isWaitingForCommit() {
+        return nextPending.get() != null;
+    }
+
+    private boolean isProcessingCommit() {
+        return currentlyCommitting.get() != null;
+    }
+
+    protected boolean needCommit(Request request) {
+        switch (request.type) {
+            case OpCode.create:
+            case OpCode.delete:
+            case OpCode.setData:
+            case OpCode.multi:
+            case OpCode.setACL:
+            case OpCode.createSession:
+            case OpCode.closeSession:
+                return true;
+            case OpCode.sync:
+                return matchSyncs;
+            default:
+                return false;
+        }
+    }
 
     @Override
     public void run() {
-        ArrayList<Request> toProcess = new ArrayList<Request>();
+        Request request;
         try {
-            Request nextPending = null;
-            while (!finished) {
-                for (Request request : toProcess) {
-                    nextProcessor.processRequest(request);
-                }
-                toProcess.clear();
-                synchronized (this) {
-                    if ((queuedRequests.isEmpty() || nextPending != null) && committedRequests.isEmpty()) {
+            while (!stopped) {
+                synchronized(this) {
+                    while (
+                        !stopped &&
+                        ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&
+                         (committedRequests.isEmpty() || isProcessingRequest()))) {
                         wait();
-                        continue;
-                    }
-                    // First check and see if the commit came in for the pending request
-                    if ((queuedRequests.isEmpty() || nextPending != null) && !committedRequests.isEmpty()) {
-                        Request r = committedRequests.remove();
-                        /*
-                         * We match with nextPending so that we can move to the
-                         * next request when it is committed. We also want to
-                         * use nextPending because it has the cnxn member set
-                         * properly.
-                         */
-                        if (nextPending != null
-                                && nextPending.sessionId == r.sessionId
-                                && nextPending.cxid == r.cxid) {
-                            // we want to send our version of the request.
-                            // the pointer to the connection in the request
-                            nextPending.setHdr(r.getHdr());
-                            nextPending.setTxn(r.getTxn());
-                            nextPending.zxid = r.zxid;
-                            toProcess.add(nextPending);
-                            nextPending = null;
-                        } else {
-                            // this request came from someone else so just send the commit packet
-                            toProcess.add(r);
-                        }
                     }
                 }
 
-                // We haven't matched the pending requests, so go back to waiting
-                if (nextPending != null) {
-                    continue;
+                /*
+                 * Processing queuedRequests: Process the next requests until we
+                 * find one for which we need to wait for a commit. We cannot
+                 * process a read request while we are processing write request.
+                 */
+                while (!stopped && !isWaitingForCommit() &&
+                       !isProcessingCommit() &&
+                       (request = queuedRequests.poll()) != null) {
+                    if (needCommit(request)) {
+                        nextPending.set(request);
+                    } else {
+                        sendToNextProcessor(request);
+                    }
                 }
 
-                synchronized (this) {
-                    // Process the next requests in the queuedRequests
-                    while (nextPending == null && !queuedRequests.isEmpty()) {
-                        Request request = queuedRequests.remove();
-                        switch (request.type) {
-                        case OpCode.create:
-                        case OpCode.delete:
-                        case OpCode.setData:
-                        case OpCode.multi:
-                        case OpCode.check:
-                        case OpCode.setACL:
-                        case OpCode.createSession:
-                        case OpCode.closeSession:
-                            nextPending = request;
-                            break;
-                        case OpCode.sync:
-                            if (matchSyncs) {
-                                nextPending = request;
-                            } else {
-                                toProcess.add(request);
-                            }
-                            break;
-                        default:
-                            toProcess.add(request);
-                        }
+                /*
+                 * Processing committedRequests: check and see if the commit
+                 * came in for the pending request. We can only commit a
+                 * request when there is no other request being processed.
+                 */
+                if (!stopped && !isProcessingRequest() &&
+                    (request = committedRequests.poll()) != null) {
+                    /*
+                     * We match with nextPending so that we can move to the
+                     * next request when it is committed. We also want to
+                     * use nextPending because it has the cnxn member set
+                     * properly.
+                     */
+                    Request pending = nextPending.get();
+                    if (pending != null &&
+                        pending.sessionId == request.sessionId &&
+                        pending.cxid == request.cxid) {
+                        // we want to send our version of the request.
+                        // the pointer to the connection in the request
+                        pending.setHdr(request.getHdr());
+                        pending.setTxn(request.getTxn());
+                        pending.zxid = request.zxid;
+                        // Set currentlyCommitting so we will block until this
+                        // completes. Cleared by CommitWorkRequest after
+                        // nextProcessor returns.
+                        currentlyCommitting.set(pending);
+                        nextPending.set(null);
+                        sendToNextProcessor(pending);
+                    } else {
+                        // this request came from someone else so just
+                        // send the commit packet
+                        currentlyCommitting.set(request);
+                        sendToNextProcessor(request);
                     }
                 }
+
             }
         } catch (InterruptedException e) {
             LOG.warn("Interrupted exception while waiting", e);
@@ -145,41 +217,129 @@ public class CommitProcessor extends Thread implements RequestProcessor {
         LOG.info("CommitProcessor exited loop!");
     }
 
-    synchronized public void commit(Request request) {
-        if (!finished) {
-            if (request == null) {
-                LOG.warn("Committed a null!",
-                         new Exception("committing a null! "));
-                return;
+    @Override
+    public void start() {
+        int numCores = Runtime.getRuntime().availableProcessors();
+        int numWorkerThreads = Integer.getInteger(
+            ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
+        workerShutdownTimeoutMS = Long.getLong(
+            ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
+
+        LOG.info("Configuring CommitProcessor with "
+                 + (numWorkerThreads > 0 ? numWorkerThreads : "no")
+                 + " worker threads.");
+        if (workerPool == null) {
+            workerPool = new WorkerService(
+                "CommitProcWork", numWorkerThreads, true);
+        }
+        stopped = false;
+        super.start();
+    }
+
+    /**
+     * Schedule final request processing; if a worker thread pool is not being
+     * used, processing is done directly by this thread.
+     */
+    private void sendToNextProcessor(Request request) {
+        numRequestsProcessing.incrementAndGet();
+        workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
+    }
+
+    /**
+     * CommitWorkRequest is a small wrapper class to allow
+     * downstream processing to be run using the WorkerService
+     */
+    private class CommitWorkRequest extends WorkerService.WorkRequest {
+        private final Request request;
+
+        CommitWorkRequest(Request request) {
+            this.request = request;
+        }
+
+        @Override
+        public void cleanup() {
+            if (!stopped) {
+                LOG.error("Exception thrown by downstream processor,"
+                          + " unable to continue.");
+                CommitProcessor.this.halt();
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Committing request:: " + request);
+        }
+
+        public void doWork() throws RequestProcessorException {
+            try {
+                nextProcessor.processRequest(request);
+            } finally {
+                // If this request is the commit request that was blocking
+                // the processor, clear.
+                currentlyCommitting.compareAndSet(request, null);
+
+                /*
+                 * Decrement outstanding request count. The processor may be
+                 * blocked at the moment because it is waiting for the pipeline
+                 * to drain. In that case, wake it up if there are pending
+                 * requests.
+                 */
+                if (numRequestsProcessing.decrementAndGet() == 0) {
+                    if (!queuedRequests.isEmpty() ||
+                        !committedRequests.isEmpty()) {
+                        wakeup();
+                    }
+                }
             }
-            committedRequests.add(request);
-            notifyAll();
         }
     }
 
-    synchronized public void processRequest(Request request) {
+    synchronized private void wakeup() {
+        notifyAll();
+    }
+
+    public void commit(Request request) {
+        if (stopped || request == null) {
+            return;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Committing request:: " + request);
+        }
+        committedRequests.add(request);
+        if (!isProcessingCommit()) {
+            wakeup();
+        }
+    }
+
+    public void processRequest(Request request) {
+        if (stopped) {
+            return;
+        }
         if (LOG.isDebugEnabled()) {
             LOG.debug("Processing request:: " + request);
         }
+        queuedRequests.add(request);
+        if (!isWaitingForCommit()) {
+            wakeup();
+        }
+    }
 
-        if (!finished) {
-            queuedRequests.add(request);
-            notifyAll();
+    private void halt() {
+        stopped = true;
+        wakeup();
+        queuedRequests.clear();
+        if (workerPool != null) {
+            workerPool.stop();
         }
     }
 
     public void shutdown() {
         LOG.info("Shutting down");
-        synchronized (this) {
-            finished = true;
-            queuedRequests.clear();
-            notifyAll();
+
+        halt();
+
+        if (workerPool != null) {
+            workerPool.join(workerShutdownTimeoutMS);
         }
+
         if (nextProcessor != null) {
             nextProcessor.shutdown();
         }
     }
+
 }

+ 20 - 6
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -413,7 +413,7 @@ public class Leader {
                 Thread.sleep(self.tickTime);
                 self.tick++;
             }
-            
+
             /**
              * WARNING: do not use this for anything other than QA testing
              * on a real cluster. Specifically to enable verification that quorum
@@ -421,7 +421,7 @@ public class Leader {
              * ZOOKEEPER-1277. Without this option it would take a very long
              * time (on order of a month say) to see the 4 billion writes
              * necessary to cause the roll-over to occur.
-             * 
+             *
              * This field allows you to override the zxid of the server. Typically
              * you'll want to set it to something like 0xfffffff0 and then
              * start the quorum, run some operations and see the re-election.
@@ -638,9 +638,23 @@ public class Leader {
          */
         public void processRequest(Request request) throws RequestProcessorException {
             next.processRequest(request);
-            Proposal p = leader.toBeApplied.peek();
-            if (p != null && p.request != null && p.request.zxid == request.zxid) {
-                leader.toBeApplied.remove();
+
+            // The only requests that should be on toBeApplied are write
+            // requests, for which we will have a hdr. We can't simply use
+            // request.zxid here because that is set on read requests to equal
+            // the zxid of the last write op.
+            if (request.getHdr() != null) {
+                long zxid = request.getHdr().getZxid();
+                Iterator<Proposal> iter = leader.toBeApplied.iterator();
+                if (iter.hasNext()) {
+                    Proposal p = iter.next();
+                    if (p.request != null && p.request.zxid == zxid) {
+                        iter.remove();
+                        return;
+                    }
+                }
+                LOG.error("Committed request not found on toBeApplied: "
+                          + request);
             }
         }
 
@@ -715,7 +729,7 @@ public class Leader {
     public long getEpoch(){
         return ZxidUtils.getEpochFromZxid(lastProposed);
     }
-    
+
     @SuppressWarnings("serial")
     public static class XidRolloverException extends Exception {
         public XidRolloverException(String message) {

+ 8 - 0
src/java/test/config/findbugsExcludeFile.xml

@@ -131,4 +131,12 @@
     <Method name="writeLongToFile" />
   </Match>
 
+  <!-- this is a helper routine to wakeup the main thread with the
+  state change happening in the routines that call it -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.quorum.CommitProcessor"/>
+    <Bug pattern="NN_NAKED_NOTIFY" />
+    <Method name="wakeup" />
+  </Match>
+
 </FindBugsFilter>

+ 422 - 0
src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java

@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.GetDataRequest;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.SessionTracker;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.quorum.CommitProcessor;
+import org.apache.zookeeper.test.ClientBase;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The following are invariant regardless of the particular implementation
+ * of the CommitProcessor, and are tested for:
+ *
+ * 1. For each session, requests are processed and the client sees its
+ *    responses in order.
+ * 2. Write requests are processed in zxid order across all sessions.
+ *
+ * The following are also tested for here, but are specific to this
+ * particular implementation. The underlying issue is that watches can be
+ * reset while reading the data. For reads/writes on two different sessions
+ * on different nodes, or with reads that do not set watches, the reads can
+ * happen in any order relative to the writes. For a read in one session that
+ * resets a watch that is triggered by a write on another session, however,
+ * we need to ensure that there is no race condition
+ *
+ * 3. The pipeline needs to be drained before a write request can enter.
+ * 4. No in-flight write requests while processing a read request.
+ */
+public class CommitProcessorTest {
+    protected static final Logger LOG =
+        LoggerFactory.getLogger(CommitProcessorTest.class);
+
+
+    private AtomicInteger processedReadRequests = new AtomicInteger(0);
+    private AtomicInteger processedWriteRequests = new AtomicInteger(0);
+
+    TestZooKeeperServer zks;
+    File tmpDir;
+    ArrayList<TestClientThread> testClients =
+        new ArrayList<TestClientThread>();
+
+    public void setUp(int numCommitThreads, int numClientThreads)
+            throws Exception {
+        System.setProperty(
+            CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS,
+            Integer.toString(numCommitThreads));
+        tmpDir = ClientBase.createTmpDir();
+        ClientBase.setupTestEnv();
+        zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
+        zks.startup();
+        for(int i=0; i<numClientThreads; ++i) {
+            TestClientThread client = new TestClientThread();
+            testClients.add(client);
+            client.start();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("tearDown starting");
+        for(TestClientThread client : testClients) {
+            client.interrupt();
+            client.join();
+        }
+        zks.shutdown();
+
+        if (tmpDir != null) {
+            Assert.assertTrue("delete " + tmpDir.toString(),
+                              ClientBase.recursiveDelete(tmpDir));
+        }
+    }
+
+    private class TestClientThread extends Thread {
+        long sessionId;
+        int cxid;
+        int nodeId;
+
+        public TestClientThread() {
+            sessionId = zks.getSessionTracker().createSession(5000);
+        }
+
+        public void sendWriteRequest() throws Exception {
+            ByteArrayOutputStream boas = new ByteArrayOutputStream();
+            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+            CreateRequest createReq = new CreateRequest(
+                "/session" + Long.toHexString(sessionId) + "-" + (++nodeId),
+                new byte[0], Ids.OPEN_ACL_UNSAFE, 1);
+            createReq.serialize(boa, "request");
+            ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+            Request req = new Request(null, sessionId, ++cxid, OpCode.create,
+                                      bb, new ArrayList<Id>());
+            zks.firstProcessor.processRequest(req);
+        }
+
+        public void sendReadRequest() throws Exception {
+            ByteArrayOutputStream boas = new ByteArrayOutputStream();
+            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+            GetDataRequest getDataRequest = new GetDataRequest(
+                "/session" + Long.toHexString(sessionId) + "-" + nodeId, false);
+            getDataRequest.serialize(boa, "request");
+            ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+            Request req = new Request(null, sessionId, ++cxid, OpCode.getData,
+                                      bb, new ArrayList<Id>());
+            zks.firstProcessor.processRequest(req);
+        }
+
+        public void run() {
+            Random rand = new Random(Thread.currentThread().getId());
+            try {
+                sendWriteRequest();
+                for(int i=0; i<1000; ++i) {
+                    // Do 25% write / 75% read request mix
+                    if (rand.nextInt(100) < 25) {
+                        sendWriteRequest();
+                    } else {
+                        sendReadRequest();
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error("Uncaught exception in test: ", e);
+            }
+        }
+    }
+
+    @Test
+    public void testNoCommitWorkers() throws Exception {
+        setUp(0, 10);
+        synchronized(this) {
+            wait(5000);
+        }
+        checkProcessedRequest();
+        Assert.assertFalse(fail);
+    }
+
+    @Test
+    public void testOneCommitWorker() throws Exception {
+        setUp(1, 10);
+        synchronized(this) {
+            wait(5000);
+        }
+        checkProcessedRequest();
+        Assert.assertFalse(fail);
+    }
+
+    @Test
+    public void testManyCommitWorkers() throws Exception {
+        setUp(10, 10);
+        synchronized(this) {
+            wait(5000);
+        }
+        checkProcessedRequest();
+        Assert.assertFalse(fail);
+
+    }
+
+    private void checkProcessedRequest() {
+        Assert.assertTrue("No read requests processed",
+                processedReadRequests.get() > 0);
+        Assert.assertTrue("No write requests processed",
+                processedWriteRequests.get() > 0);
+    }
+
+    volatile boolean fail = false;
+    synchronized private void failTest(String reason) {
+        fail = true;
+        notifyAll();
+        Assert.fail(reason);
+    }
+
+    private class TestZooKeeperServer extends ZooKeeperServer {
+        PrepRequestProcessor firstProcessor;
+        CommitProcessor commitProcessor;
+
+        public TestZooKeeperServer(File snapDir, File logDir, int tickTime)
+                throws IOException {
+            super(snapDir, logDir, tickTime);
+        }
+
+        public SessionTracker getSessionTracker() {
+            return sessionTracker;
+        }
+
+        // Leader mock: Prep -> MockProposal -> Commit -> validate -> Final
+        // Have side thread call commitProc.commit()
+        @Override
+        protected void setupRequestProcessors() {
+            RequestProcessor finalProcessor = new FinalRequestProcessor(zks);
+            // ValidateProcessor is set up in a similar fashion to ToBeApplied
+            // processor, so it can do pre/post validating of requests
+            ValidateProcessor validateProcessor =
+                new ValidateProcessor(finalProcessor);
+            commitProcessor = new CommitProcessor(validateProcessor, "1", true);
+            validateProcessor.setCommitProcessor(commitProcessor);
+            commitProcessor.start();
+            MockProposalRequestProcessor proposalProcessor =
+                new MockProposalRequestProcessor(commitProcessor);
+            proposalProcessor.start();
+            firstProcessor = new PrepRequestProcessor(zks, proposalProcessor);
+            firstProcessor.start();
+        }
+    }
+
+    private class MockProposalRequestProcessor extends Thread
+            implements RequestProcessor {
+        private final CommitProcessor commitProcessor;
+        private final LinkedBlockingQueue<Request> proposals =
+            new LinkedBlockingQueue<Request>();
+
+        public MockProposalRequestProcessor(CommitProcessor commitProcessor) {
+            this.commitProcessor = commitProcessor;
+        }
+
+        @Override
+        public void run() {
+            Random rand = new Random(Thread.currentThread().getId());
+            try {
+                while(true) {
+                    Request request = proposals.take();
+                    Thread.sleep(10 + rand.nextInt(190));
+                    commitProcessor.commit(request);
+                }
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+
+        @Override
+        public void processRequest(Request request)
+                throws RequestProcessorException {
+            commitProcessor.processRequest(request);
+            if (request.getHdr() != null) {
+                // fake propose request
+                proposals.add(request);
+            }
+        }
+
+        @Override
+        public void shutdown() {
+            // TODO Auto-generated method stub
+
+        }
+    }
+
+    private class ValidateProcessor implements RequestProcessor {
+        Random rand = new Random(Thread.currentThread().getId());
+        RequestProcessor nextProcessor;
+        CommitProcessor commitProcessor;
+        AtomicLong expectedZxid = new AtomicLong(1);
+        ConcurrentHashMap<Long, AtomicInteger> cxidMap =
+            new ConcurrentHashMap<Long, AtomicInteger>();
+
+        AtomicInteger outstandingReadRequests = new AtomicInteger(0);
+        AtomicInteger outstandingWriteRequests = new AtomicInteger(0);
+
+        public ValidateProcessor(RequestProcessor nextProcessor) {
+            this.nextProcessor = nextProcessor;
+        }
+
+        public void setCommitProcessor(CommitProcessor commitProcessor) {
+            this.commitProcessor = commitProcessor;
+        }
+
+
+        @Override
+        public void processRequest(Request request)
+                throws RequestProcessorException {
+            boolean isWriteRequest = commitProcessor.needCommit(request);
+            if (isWriteRequest) {
+                outstandingWriteRequests.incrementAndGet();
+                validateWriteRequestVariant(request);
+                LOG.debug("Starting write request zxid=" + request.zxid);
+            } else {
+                LOG.debug("Starting read request cxid="
+                        + request.cxid + " for session 0x"
+                        + Long.toHexString(request.sessionId));
+                outstandingReadRequests.incrementAndGet();
+                validateReadRequestVariant(request);
+            }
+
+            // Insert random delay to test thread race conditions
+            try {
+                Thread.sleep(10 + rand.nextInt(290));
+            } catch(InterruptedException e) {
+                // ignore
+            }
+            nextProcessor.processRequest(request);
+
+            /*
+             * The commit workers will have to execute this line before they
+             * wake up the commit processor. So this value is up-to-date when
+             * variant check is performed
+             */
+            if (isWriteRequest) {
+                outstandingWriteRequests.decrementAndGet();
+                LOG.debug("Done write request zxid=" + request.zxid);
+                processedWriteRequests.incrementAndGet();
+            } else {
+                outstandingReadRequests.decrementAndGet();
+                LOG.debug("Done read request cxid="
+                        + request.cxid + " for session 0x"
+                        + Long.toHexString(request.sessionId));
+                processedReadRequests.incrementAndGet();
+            }
+            validateRequest(request);
+        }
+
+        /**
+         * Validate that this is the only request in the pipeline
+         */
+        private void validateWriteRequestVariant(Request request) {
+            long zxid = request.getHdr().getZxid();
+            int readRequests = outstandingReadRequests.get();
+            if (readRequests != 0) {
+                failTest("There are " + readRequests + " outstanding"
+                        + " read requests while issuing a write request zxid="
+                        + zxid);
+            }
+            int writeRequests = outstandingWriteRequests.get();
+            if (writeRequests > 1) {
+                failTest("There are " + writeRequests + " outstanding"
+                        + " write requests while issuing a write request zxid="
+                        + zxid + " (expected one)");
+            }
+        }
+
+        /**
+         * Validate that no write request is in the pipeline while working
+         * on a read request
+         */
+        private void validateReadRequestVariant(Request request) {
+            int writeRequests = outstandingWriteRequests.get();
+            if (writeRequests != 0) {
+                failTest("There are " + writeRequests + " outstanding"
+                        + " write requests while issuing a read request cxid="
+                        + request.cxid + " for session 0x"
+                        + Long.toHexString(request.sessionId));
+            }
+        }
+
+        private void validateRequest(Request request) {
+            LOG.info("Got request " + request);
+
+            // Zxids should always be in order for write requests
+            if (request.getHdr() != null) {
+                long zxid = request.getHdr().getZxid();
+                if (!expectedZxid.compareAndSet(zxid, zxid + 1)) {
+                    failTest("Write request, expected_zxid="
+                             + expectedZxid.get() + "; req_zxid=" + zxid);
+                }
+            }
+
+            // Each session should see its cxids in order
+            AtomicInteger sessionCxid = cxidMap.get(request.sessionId);
+            if (sessionCxid == null) {
+                sessionCxid = new AtomicInteger(request.cxid + 1);
+                AtomicInteger existingSessionCxid =
+                    cxidMap.putIfAbsent(request.sessionId, sessionCxid);
+                if (existingSessionCxid != null) {
+                    failTest("Race condition adding cxid=" + request.cxid
+                             + " for session 0x"
+                             + Long.toHexString(request.sessionId)
+                             + " with other_cxid=" + existingSessionCxid.get());
+                }
+            } else {
+                if (!sessionCxid.compareAndSet(
+                      request.cxid, request.cxid + 1)) {
+                    failTest("Expected_cxid=" + sessionCxid.get()
+                             + "; req_cxid=" + request.cxid);
+                }
+            }
+        }
+
+        @Override
+        public void shutdown() {
+            // TODO Auto-generated method stub
+        }
+    }
+
+}