Browse Source

ZOOKEEPER-3268: Add commit processor metrics

Author: Jie Huang <jiehuang@fb.com>

Reviewers: fangmin@apache.org, evolivelli@aparche.org, andor@apache.org

Closes #800 from jhuan31/ZOOKEEPER-3268
Jie Huang 6 năm trước cách đây
mục cha
commit
cb9727dc5b

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -80,6 +80,10 @@ public class Request {
 
     public long prepQueueStartTime= -1;
 
+    public long commitProcQueueStartTime = -1;
+
+    public long commitRecvTime = -1;
+
     private Object owner;
 
     private KeeperException e;

+ 110 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -131,6 +131,61 @@ public final class ServerMetrics {
         SESSIONLESS_CONNECTIONS_EXPIRED = metricsContext.getCounter("sessionless_connections_expired");
         STALE_SESSIONS_EXPIRED = metricsContext.getCounter("stale_sessions_expired");
 
+        /*
+         * Number of requests that are in the session queue.
+         */
+        REQUESTS_IN_SESSION_QUEUE = metricsContext.getSummary("requests_in_session_queue", DetailLevel.BASIC);
+        PENDING_SESSION_QUEUE_SIZE = metricsContext.getSummary("pending_session_queue_size", DetailLevel.BASIC);
+        /*
+         * Consecutive number of read requests that are in the session queue right after a commit request.
+         */
+        READS_AFTER_WRITE_IN_SESSION_QUEUE = metricsContext.getSummary("reads_after_write_in_session_queue", DetailLevel.BASIC);
+        READ_ISSUED_FROM_SESSION_QUEUE = metricsContext.getSummary("reads_issued_from_session_queue", DetailLevel.BASIC);
+        SESSION_QUEUES_DRAINED = metricsContext.getSummary("session_queues_drained", DetailLevel.BASIC);
+
+        TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ = metricsContext.getSummary("time_waiting_empty_pool_in_commit_processor_read_ms", DetailLevel.BASIC);
+        WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR = metricsContext.getSummary("write_batch_time_in_commit_processor", DetailLevel.BASIC);
+
+        CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR = metricsContext.getSummary("concurrent_request_processing_in_commit_processor", DetailLevel.BASIC);
+
+        READS_QUEUED_IN_COMMIT_PROCESSOR = metricsContext.getSummary("read_commit_proc_req_queued", DetailLevel.BASIC);
+        WRITES_QUEUED_IN_COMMIT_PROCESSOR = metricsContext.getSummary("write_commit_proc_req_queued", DetailLevel.BASIC);
+        COMMITS_QUEUED_IN_COMMIT_PROCESSOR = metricsContext.getSummary("commit_commit_proc_req_queued", DetailLevel.BASIC);
+        COMMITS_QUEUED = metricsContext.getCounter("request_commit_queued");
+        READS_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC);
+        WRITES_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC);
+
+        /**
+         * Time spent by a read request in the commit processor.
+         */
+        READ_COMMITPROC_TIME = metricsContext.getSummary("read_commitproc_time_ms", DetailLevel.ADVANCED);
+
+        /**
+         * Time spent by a write request in the commit processor.
+         */
+        WRITE_COMMITPROC_TIME = metricsContext.getSummary("write_commitproc_time_ms", DetailLevel.ADVANCED);
+
+        /**
+         * Time spent by a committed request, for a locally issued write, in the
+         * commit processor.
+         */
+        LOCAL_WRITE_COMMITTED_TIME = metricsContext.getSummary("local_write_committed_time_ms", DetailLevel.ADVANCED);
+
+        /**
+         * Time spent by a committed request for a write, issued by other server, in the
+         * commit processor.
+         */
+        SERVER_WRITE_COMMITTED_TIME = metricsContext.getSummary("server_write_committed_time_ms", DetailLevel.ADVANCED);
+
+        COMMIT_PROCESS_TIME = metricsContext.getSummary("commit_process_time", DetailLevel.BASIC);
+
+
+        /**
+         * Time spent by the final processor. This is tracked in the commit processor.
+         */
+        READ_FINAL_PROC_TIME = metricsContext.getSummary("read_final_proc_time_ms", DetailLevel.ADVANCED);
+        WRITE_FINAL_PROC_TIME = metricsContext.getSummary("write_final_proc_time_ms", DetailLevel.ADVANCED);
+
     }
 
     /**
@@ -220,6 +275,61 @@ public final class ServerMetrics {
     public final Counter RESPONSE_PACKET_CACHE_HITS;
     public final Counter RESPONSE_PACKET_CACHE_MISSING;
 
+    /*
+     * Number of requests that are in the session queue.
+     */
+    public final Summary REQUESTS_IN_SESSION_QUEUE;
+    public final Summary PENDING_SESSION_QUEUE_SIZE;
+    /*
+     * Consecutive number of read requests that are in the session queue right after a commit request.
+     */
+    public final Summary READS_AFTER_WRITE_IN_SESSION_QUEUE;
+    public final Summary READ_ISSUED_FROM_SESSION_QUEUE;
+    public final Summary SESSION_QUEUES_DRAINED;
+
+    public final Summary TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ;
+    public final Summary WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR;
+
+    public final Summary CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR;
+
+    public final Summary READS_QUEUED_IN_COMMIT_PROCESSOR;
+    public final Summary WRITES_QUEUED_IN_COMMIT_PROCESSOR;
+    public final Summary COMMITS_QUEUED_IN_COMMIT_PROCESSOR;
+    public final Counter COMMITS_QUEUED;
+    public final Summary READS_ISSUED_IN_COMMIT_PROC;
+    public final Summary WRITES_ISSUED_IN_COMMIT_PROC;
+
+    /**
+     * Time spent by a read request in the commit processor.
+     */
+    public final Summary READ_COMMITPROC_TIME;
+
+    /**
+     * Time spent by a write request in the commit processor.
+     */
+    public final Summary WRITE_COMMITPROC_TIME;
+
+    /**
+     * Time spent by a committed request, for a locally issued write, in the
+     * commit processor.
+     */
+    public final Summary LOCAL_WRITE_COMMITTED_TIME;
+
+    /**
+     * Time spent by a committed request for a write, issued by other server, in the
+     * commit processor.
+     */
+    public final Summary SERVER_WRITE_COMMITTED_TIME;
+
+    public final Summary COMMIT_PROCESS_TIME;
+
+
+    /**
+     * Time spent by the final processor. This is tracked in the commit processor.
+     */
+    public final Summary READ_FINAL_PROC_TIME;
+    public final Summary WRITE_FINAL_PROC_TIME;
+
     /*
      * Number of successful matches of expected ensemble name in EnsembleAuthenticationProvider.
      */

+ 98 - 11
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -24,13 +24,14 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.WorkerService;
@@ -41,7 +42,7 @@ import org.apache.zookeeper.server.ZooKeeperServerListener;
  * This RequestProcessor matches the incoming committed requests with the
  * locally submitted requests. The trick is that locally submitted requests that
  * change the state of the system will come back as incoming committed requests,
- * so we need to match them up. Instead of just waiting for the committed requests, 
+ * so we need to match them up. Instead of just waiting for the committed requests,
  * we process the uncommitted requests that belong to other sessions.
  *
  * The CommitProcessor is multi-threaded. Communication between threads is
@@ -88,6 +89,16 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
     protected LinkedBlockingQueue<Request> queuedRequests =
         new LinkedBlockingQueue<Request>();
 
+    /**
+     * The number of read requests currently held in all session queues
+     */
+    private AtomicInteger numReadQueuedRequests = new AtomicInteger(0);
+
+    /**
+     * The number of quorum requests currently held in all session queued
+     */
+    private AtomicInteger numWriteQueuedRequests = new AtomicInteger(0);
+
     /**
      * Requests that have been committed.
      */
@@ -108,7 +119,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
 
     /** For testing purposes, we use a separated stopping condition for the
      * outer loop.*/
-    protected volatile boolean stoppedMainLoop = true; 
+    protected volatile boolean stoppedMainLoop = true;
     protected volatile boolean stopped = true;
     private long workerShutdownTimeoutMS;
     protected WorkerService workerPool;
@@ -146,7 +157,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
             case OpCode.setACL:
                 return true;
             case OpCode.sync:
-                return matchSyncs;    
+                return matchSyncs;
             case OpCode.createSession:
             case OpCode.closeSession:
                 return !request.isLocalSession();
@@ -189,6 +200,13 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                         }
                     }
                 }
+
+                ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(numReadQueuedRequests.get());
+                ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(numWriteQueuedRequests.get());
+                ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(committedRequests.size());
+
+                long time = Time.currentElapsedTime();
+
                 /*
                  * Processing up to requestsToProcess requests from the incoming
                  * queue (queuedRequests), possibly less if a committed request
@@ -203,10 +221,11 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                             || pendingRequests.containsKey(request.sessionId)) {
                         // Add request to pending
                         pendingRequests
-                            .computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>())
-                            .add(request);
-                    }
-                    else {
+                                .computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>())
+                                .add(request);
+                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(pendingRequests.get(request.sessionId).size());
+                    } else {
+                        numReadQueuedRequests.decrementAndGet();
                         sendToNextProcessor(request);
                     }
                     /*
@@ -248,7 +267,9 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                      */
                     Deque<Request> sessionQueue = pendingRequests
                             .get(request.sessionId);
+                    ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
                     if (sessionQueue != null) {
+                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
                         // If session queue != null, then it is also not empty.
                         Request topPending = sessionQueue.poll();
                         if (request.cxid != topPending.cxid) {
@@ -296,12 +317,15 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                             topPending.setHdr(request.getHdr());
                             topPending.setTxn(request.getTxn());
                             topPending.zxid = request.zxid;
+                            topPending.commitRecvTime = request.commitRecvTime;
                             request = topPending;
+
+                            // Only decrement if we take a request off the queue.
+                            numWriteQueuedRequests.decrementAndGet();
                         }
                     }
 
                     sendToNextProcessor(request);
-
                     waitForEmptyPool();
 
                     /*
@@ -309,16 +333,24 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                      * empty.
                      */
                     if (sessionQueue != null) {
+                        int readsAfterWrite = 0;
                         while (!stopped && !sessionQueue.isEmpty()
                                 && !needCommit(sessionQueue.peek())) {
+                            numReadQueuedRequests.decrementAndGet();
                             sendToNextProcessor(sessionQueue.poll());
+                            readsAfterWrite++;
                         }
+                        ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
+
                         // Remove empty queues
                         if (sessionQueue.isEmpty()) {
                             pendingRequests.remove(request.sessionId);
                         }
                     }
                 }
+
+                ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
+                endOfIteration();
             } while (!stoppedMainLoop);
         } catch (Throwable e) {
             handleException(this.getName(), e);
@@ -326,12 +358,26 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         LOG.info("CommitProcessor exited loop!");
     }
 
-    private void waitForEmptyPool() throws InterruptedException {
+    //for test only
+    protected void endOfIteration() {
+
+    }
+
+    protected void waitForEmptyPool() throws InterruptedException {
+        int numRequestsInProcess = numRequestsProcessing.get();
+        if (numRequestsInProcess != 0) {
+            ServerMetrics.getMetrics().CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR.add(
+                    numRequestsInProcess);
+        }
+
+        long startWaitTime = Time.currentElapsedTime();
         synchronized(emptyPoolSync) {
             while ((!stopped) && isProcessingRequest()) {
                 emptyPoolSync.wait();
             }
         }
+        ServerMetrics.getMetrics().TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ.add(
+                Time.currentElapsedTime() - startWaitTime);
     }
 
     @Override
@@ -385,8 +431,40 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
 
         public void doWork() throws RequestProcessorException {
             try {
+                if (needCommit(request)) {
+                    if (request.commitProcQueueStartTime != -1 &&
+                            request.commitRecvTime != -1) {
+                        // Locally issued writes.
+                        long currentTime = Time.currentElapsedTime();
+                        ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime -
+                                request.commitProcQueueStartTime);
+                        ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime -
+                                request.commitRecvTime);
+                    } else if (request.commitRecvTime != -1) {
+                        // Writes issued by other servers.
+                        ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(
+                                Time.currentElapsedTime() - request.commitRecvTime);
+                    }
+                } else {
+                    if (request.commitProcQueueStartTime != -1) {
+                        ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(
+                                Time.currentElapsedTime() -
+                                        request.commitProcQueueStartTime);
+                    }
+                }
+
+                long timeBeforeFinalProc = Time.currentElapsedTime();
                 nextProcessor.processRequest(request);
+                if (needCommit(request)) {
+                    ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(
+                            Time.currentElapsedTime() - timeBeforeFinalProc);
+                } else {
+                    ServerMetrics.getMetrics().READ_FINAL_PROC_TIME.add(
+                            Time.currentElapsedTime() - timeBeforeFinalProc);
+                }
+
             } finally {
+
                 if (numRequestsProcessing.decrementAndGet() == 0){
                     wakeupOnEmpty();
                 }
@@ -404,7 +482,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
             emptyPoolSync.notifyAll();
         }
     }
-    
+
     public void commit(Request request) {
         if (stopped || request == null) {
             return;
@@ -412,6 +490,8 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         if (LOG.isDebugEnabled()) {
             LOG.debug("Committing request:: " + request);
         }
+        request.commitRecvTime = Time.currentElapsedTime();
+        ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
         committedRequests.add(request);
         wakeup();
     }
@@ -424,7 +504,14 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         if (LOG.isDebugEnabled()) {
             LOG.debug("Processing request:: " + request);
         }
+        request.commitProcQueueStartTime = Time.currentElapsedTime();
         queuedRequests.add(request);
+        // If the request will block, add it to the queue of blocking requests
+        if (needCommit(request)) {
+            numWriteQueuedRequests.incrementAndGet();
+        } else {
+            numReadQueuedRequests.incrementAndGet();
+        }
         wakeup();
     }
 

+ 499 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java

@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.*;
+
+public class CommitProcessorMetricsTest extends ZKTestCase {
+    protected static final Logger LOG =
+            LoggerFactory.getLogger(CommitProcessorMetricsTest.class);
+    CommitProcessor commitProcessor;
+    DummyFinalProcessor finalProcessor;
+
+    CountDownLatch requestScheduled = null;
+    CountDownLatch requestProcessed = null;
+    CountDownLatch commitSeen = null;
+    CountDownLatch poolEmpytied = null;
+
+    @Before
+    public void setup() {
+        LOG.info("setup");
+        ServerMetrics.getMetrics().resetAll();
+    }
+
+    public void setupProcessors(int commitWorkers, int finalProcTime ) {
+        finalProcessor = new DummyFinalProcessor(finalProcTime);
+        commitProcessor = new TestCommitProcessor(finalProcessor, commitWorkers);
+        commitProcessor.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("tearDown starting");
+
+        commitProcessor.shutdown();
+        commitProcessor.join();
+    }
+
+    private class TestCommitProcessor extends CommitProcessor {
+        int numWorkerThreads;
+
+        public TestCommitProcessor(RequestProcessor finalProcessor, int numWorkerThreads) {
+            super(finalProcessor, "1", true, null);
+            this.numWorkerThreads = numWorkerThreads;
+        }
+
+        @Override
+        public void start() {
+            super.workerPool = new TestWorkerService(numWorkerThreads);
+            super.start();
+            // Since there are two threads--the test thread that puts requests into the queue and the processor
+            // thread (this thread) that removes requests from the queue--the execution order in general is
+            // indeterminate, making it hard to check the test results.
+            //
+            // In some tests, we really want the requests processed one by one. To achieve this, we make sure that
+            // things happen in this order:
+            // processor thread gets into WAITING -> test thread sets requestProcessed latch -> test thread puts
+            // a request into the queue (which wakes up the processor thread in the WAITING state) and waits for
+            // the requestProcessed latch -> the processor thread wakes up and removes the request from the queue and
+            // processes it and opens the requestProcessed latch -> the test thread continues onto the next request
+
+            // So it is important for the processor thread to get into WAITING before any request is put into the queue.
+            // Otherwise, it would miss the wakeup signal and wouldn't process the request or open the latch and the
+            // test thread waiting on the latch would be stuck
+            Thread.State state = super.getState();
+            while (state != State.WAITING) {
+                try {
+                    Thread.sleep(50);
+                } catch (Exception e){
+
+                }
+                state = super.getState();
+            }
+            LOG.info("numWorkerThreads in Test is {}", numWorkerThreads);
+        }
+
+        @Override
+        protected void endOfIteration() {
+            if (requestProcessed != null) {
+                requestProcessed.countDown();
+            }
+        }
+
+        @Override
+        protected void waitForEmptyPool() throws InterruptedException {
+            if (commitSeen != null) {
+                commitSeen.countDown();
+            }
+            super.waitForEmptyPool();
+            if (poolEmpytied != null) {
+                poolEmpytied.countDown();
+            }
+        }
+    }
+
+    private class TestWorkerService extends WorkerService {
+        public TestWorkerService(int numWorkerThreads) {
+            super("CommitProcWork", numWorkerThreads, true);
+        }
+
+        @Override
+        public void schedule(WorkRequest workRequest, long id) {
+            super.schedule(workRequest, id);
+            if (requestScheduled != null) {
+                requestScheduled.countDown();
+            }
+        }
+    }
+
+    private class DummyFinalProcessor implements RequestProcessor {
+        int processTime;
+        public DummyFinalProcessor(int processTime) {
+            this.processTime = processTime;
+        }
+
+        @Override
+        public void processRequest(Request request) {
+            if (processTime > 0) {
+                try {
+                    if (commitSeen != null) {
+                        commitSeen.await(5, TimeUnit.SECONDS);
+                    }
+                    Thread.sleep(processTime);
+                } catch (Exception e) {
+
+                }
+            }
+        }
+
+        @Override
+        public void shutdown(){
+        }
+    }
+
+    private void checkMetrics(String metricName, long min, long max, double avg, long cnt, long sum) {
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+
+        Assert.assertEquals("expected min is " + min, min, values.get("min_" + metricName));
+        Assert.assertEquals("expected max is: " + max, max, values.get("max_" + metricName));
+        Assert.assertEquals("expected avg is: " + avg, avg, (Double)values.get("avg_" + metricName), 0.001);
+        Assert.assertEquals("expected cnt is: " + cnt, cnt, values.get("cnt_" + metricName));
+        Assert.assertEquals("expected sum is: " + sum, sum, values.get("sum_" + metricName));
+    }
+
+    private void checkTimeMetric(long actual, long lBoundrary, long hBoundrary) {
+        Assert.assertThat(actual, greaterThanOrEqualTo(lBoundrary));
+        Assert.assertThat(actual, lessThanOrEqualTo(hBoundrary));
+    }
+
+    private Request createReadRequest(long sessionId, int xid) {
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.getData,
+                ByteBuffer.wrap(new byte[10]), null);
+    }
+
+    private Request createWriteRequest(long sessionId, int xid) {
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.setData,
+                ByteBuffer.wrap(new byte[10]), null);
+    }
+
+    private void processRequestWithWait(Request request) throws Exception {
+        requestProcessed = new CountDownLatch(1);
+        commitProcessor.processRequest(request);
+        requestProcessed.await(5, TimeUnit.SECONDS);
+    }
+
+    private void commitWithWait(Request request) throws Exception {
+        requestProcessed = new CountDownLatch(1);
+        commitProcessor.commit(request);
+        requestProcessed.await(5, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testRequestsInSessionQueue() throws Exception {
+        setupProcessors(0, 0);
+
+        Request req1 = createWriteRequest(1l, 1);
+        processRequestWithWait(req1);
+
+        checkMetrics("requests_in_session_queue", 1L, 1L, 1D, 1L, 1L);
+
+        //these two read requests will be stuck in the session queue because there is write in front of them
+        processRequestWithWait(createReadRequest(1L, 2));
+        processRequestWithWait(createReadRequest(1L,3));
+
+        checkMetrics("requests_in_session_queue", 1L, 3L, 2D, 3L, 6);
+
+        commitWithWait(req1);
+
+        checkMetrics("requests_in_session_queue", 1L, 3L, 2.25D, 4L, 9);
+    }
+
+    @Test
+    public void testWriteFinalProcTime() throws Exception {
+        setupProcessors(0, 1000);
+
+        Request req1 = createWriteRequest(1l, 2);
+        processRequestWithWait(req1);
+
+        //no request sent to next processor yet
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(0L, values.get("cnt_write_final_proc_time_ms"));
+
+        commitWithWait(req1);
+
+        values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(1L, values.get("cnt_write_final_proc_time_ms"));
+        checkTimeMetric((long)values.get("max_write_final_proc_time_ms"), 1000L, 2000L);
+    }
+
+    @Test
+    public void testReadFinalProcTime() throws Exception {
+        setupProcessors(0, 1000);
+
+        processRequestWithWait(createReadRequest(1L, 1));
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(1L, values.get("cnt_read_final_proc_time_ms"));
+        checkTimeMetric((long)values.get("max_read_final_proc_time_ms"), 1000L, 2000L);
+    }
+
+    @Test
+    public void testCommitProcessTime() throws Exception {
+        setupProcessors(0, 0);
+        processRequestWithWait(createReadRequest(1L, 1));
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(1L, values.get("cnt_commit_process_time"));
+        checkTimeMetric((long)values.get("max_commit_process_time"), 0L, 1000L);
+    }
+
+    @Test
+    public void testServerWriteCommittedTime() throws Exception {
+        setupProcessors(0, 0);
+        //a commit w/o pending request is a write from other servers
+        commitWithWait(createWriteRequest(1L, 1));
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(1L, values.get("cnt_server_write_committed_time_ms"));
+        checkTimeMetric((long)values.get("max_server_write_committed_time_ms"), 0L, 1000L);
+    }
+
+    @Test
+    public void testLocalWriteCommittedTime() throws Exception {
+        setupProcessors(0, 0);
+        Request req1 = createWriteRequest(1l, 2);
+        processRequestWithWait(req1);
+        commitWithWait(req1);
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+
+        Assert.assertEquals(1L, values.get("cnt_local_write_committed_time_ms"));
+        checkTimeMetric((long)values.get("max_local_write_committed_time_ms"), 0l, 1000l);
+
+        Request req2 = createWriteRequest(1l, 2);
+        processRequestWithWait(req2);
+        //the second write will be stuck in the session queue for at least one second
+        //but the LOCAL_WRITE_COMMITTED_TIME is from when the commit is received
+        Thread.sleep(1000);
+
+        commitWithWait(req2);
+
+        values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(2L, values.get("cnt_local_write_committed_time_ms"));
+        checkTimeMetric((long)values.get("max_local_write_committed_time_ms"), 0L, 1000L);
+    }
+
+
+    @Test
+    public void testWriteCommitProcTime() throws Exception {
+        setupProcessors(0, 0);
+        Request req1 = createWriteRequest(1l, 2);
+        processRequestWithWait(req1);
+        commitWithWait(req1);
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+
+        Assert.assertEquals(1L, values.get("cnt_write_commitproc_time_ms"));
+        checkTimeMetric((long)values.get("max_write_commitproc_time_ms"), 0l, 1000l);
+
+        Request req2 = createWriteRequest(1l, 2);
+        processRequestWithWait(req2);
+        //the second write will be stuck in the session queue for at least one second
+        Thread.sleep(1000);
+
+        commitWithWait(req2);
+
+        values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(2L, values.get("cnt_write_commitproc_time_ms"));
+        checkTimeMetric((long)values.get("max_write_commitproc_time_ms"), 1000L, 2000L);
+    }
+
+
+    @Test
+    public void testReadCommitProcTime() throws Exception {
+        setupProcessors(0, 0);
+        processRequestWithWait(createReadRequest(1l, 1));
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+
+        Assert.assertEquals(1L, values.get("cnt_read_commitproc_time_ms"));
+        checkTimeMetric((long)values.get("max_read_commitproc_time_ms"), 0l, 1000l);
+
+        Request req1 = createWriteRequest(1l, 2);
+        processRequestWithWait(req1);
+        processRequestWithWait(createReadRequest(1l, 3));
+        //the second read will be stuck in the session queue for at least one second
+        Thread.sleep(1000);
+
+        commitWithWait(req1);
+
+        values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(2L, values.get("cnt_read_commitproc_time_ms"));
+        checkTimeMetric((long)values.get("max_read_commitproc_time_ms"), 1000L, 2000L);
+    }
+
+
+    @Test
+    public void testTimeWaitingEmptyPoolInCommitProcessorRead() throws Exception {
+        setupProcessors(1, 1000);
+
+        //three read requests will be scheduled first
+        requestScheduled = new CountDownLatch(3);
+        commitProcessor.processRequest(createReadRequest(0l, 2));
+        commitProcessor.processRequest(createReadRequest(1l, 3));
+        commitProcessor.processRequest(createReadRequest(2l, 4));
+        requestScheduled.await(5, TimeUnit.SECONDS);
+
+        //add a commit request to trigger waitForEmptyPool
+        poolEmpytied = new CountDownLatch(1);
+        commitProcessor.commit(createWriteRequest(1l, 1));
+        poolEmpytied.await(5, TimeUnit.SECONDS);
+
+        long actual = (long)MetricsUtils.currentServerMetrics().get("max_time_waiting_empty_pool_in_commit_processor_read_ms");
+        //since each request takes 1000ms to process, so the waiting shouldn't be more than three times of that
+        checkTimeMetric(actual, 2500L, 3500L);
+    }
+
+
+    @Test
+    public void testConcurrentRequestProcessingInCommitProcessor() throws Exception {
+        setupProcessors(3, 1000);
+
+        //three read requests will be processed in parallel
+        commitSeen = new CountDownLatch(1);
+        commitProcessor.processRequest(createReadRequest(1l, 2));
+        commitProcessor.processRequest(createReadRequest(1l, 3));
+        commitProcessor.processRequest(createReadRequest(1l, 4));
+
+        //add a commit request to trigger waitForEmptyPool, which will record number of requests being proccessed
+        poolEmpytied = new CountDownLatch(1);
+        commitProcessor.commit(createWriteRequest(1l, 1));
+        poolEmpytied.await(5, TimeUnit.SECONDS);
+
+        //this will change after we upstream batch write in CommitProcessor
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(3L, values.get("max_concurrent_request_processing_in_commit_processor"));
+    }
+
+    @Test
+    public void testReadsAfterWriteInSessionQueue() throws Exception {
+        setupProcessors(0, 0);
+        //this read request is before write
+        processRequestWithWait(createReadRequest(1l, 1));
+
+        //one write request
+        Request req1 = createWriteRequest(1l, 1);
+        processRequestWithWait(req1);
+
+
+        //three read requests after the write
+        processRequestWithWait(createReadRequest(1l, 2));
+        processRequestWithWait(createReadRequest(1l, 3));
+        processRequestWithWait(createReadRequest(1l, 4));
+
+        //commit the write
+        commitWithWait(req1);
+
+        checkMetrics("reads_after_write_in_session_queue", 3l, 3l, 3d, 1, 3);
+    }
+
+
+    @Test
+    public void testReadsQueuedInCommitProcessor() throws Exception {
+        setupProcessors(0, 0);
+        processRequestWithWait(createReadRequest(1l, 1));
+        processRequestWithWait(createReadRequest(1l, 2));
+
+        //recorded reads in the queue are 1, 1
+        checkMetrics("read_commit_proc_req_queued", 1l, 1l, 1d, 2, 2);
+    }
+
+    @Test
+    public void testWritesQueuedInCommitProcessor() throws Exception {
+        setupProcessors(0, 0);
+        Request req1 = createWriteRequest(1l, 1);
+        processRequestWithWait(req1);
+        Request req2 = createWriteRequest(1l, 2);
+        processRequestWithWait(req2);
+
+        //since we haven't got any commit request, the write request stays in the queue
+        //recorded writes in the queue are 1, 2
+        checkMetrics("write_commit_proc_req_queued", 1l, 2l, 1.5d, 2, 3);
+
+        commitWithWait(req1);
+
+        //recording is done before commit request is processed, so writes in the queue are: 1, 2, 2
+        checkMetrics("write_commit_proc_req_queued", 1l, 2l, 1.6667d, 3, 5);
+
+        commitWithWait(req2);
+        //writes in the queue are 1, 2, 2, 1
+        checkMetrics("write_commit_proc_req_queued", 1l, 2l, 1.5d, 4, 6);
+
+        //send a read request to trigger the recording, this time the write queue should be empty
+        //writes in the queue are 1, 2, 2, 1, 0
+        processRequestWithWait(createReadRequest(1l, 1));
+
+        checkMetrics("write_commit_proc_req_queued", 0l, 2l, 1.2d, 5, 6);
+    }
+
+    @Test
+    public void testCommitsQueuedInCommitProcessor() throws Exception {
+        setupProcessors(0, 0);
+
+        commitWithWait(createWriteRequest(1l, 1));
+        commitWithWait(createWriteRequest(1l, 2));
+
+        //recorded commits in the queue are 1, 1
+        checkMetrics("commit_commit_proc_req_queued", 1l, 1l, 1d, 2, 2);
+    }
+
+    @Test
+    public void testCommitsQueued() throws Exception {
+        setupProcessors(0, 0);
+
+        commitWithWait(createWriteRequest(1l, 1));
+        commitWithWait(createWriteRequest(1l, 2));
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(2l, (long)values.get("request_commit_queued"));
+    }
+
+    @Test
+    public void testPendingSessionQueueSize() throws Exception {
+        setupProcessors(0, 0);
+
+        //one write request for session 1
+        Request req1 = createWriteRequest(1l, 1);
+        processRequestWithWait(req1);
+
+        //two write requests for session 2
+        Request req2 = createWriteRequest(2l, 2);
+        processRequestWithWait(req2);
+        Request req3 = createWriteRequest(2l, 3);
+        processRequestWithWait(req3);
+
+        commitWithWait(req1);
+        //there are two sessions with pending requests
+        checkMetrics("pending_session_queue_size", 2l, 2l, 2d, 1, 2);
+
+        commitWithWait(req2);
+        //there is on session with pending requests
+        checkMetrics("pending_session_queue_size", 1l, 2l, 1.5d, 2, 3);
+
+        commitWithWait(req3);
+        //there is one session with pending requests
+        checkMetrics("pending_session_queue_size", 1l, 2l, 1.333d, 3, 4);
+    }
+}