Browse Source

ZOOKEEPER-3448: Introduce MessageTracker to assist debug leader and leaner connectivity issues.

We want to have better insight on the state of the world when learners lost connection with leader, so we need capture more information when that happens. We capture more information through MessageTracker which will record the last few sent and received messages at various protocol stage, and these information will be dumped to log files for further analysis.

Author: Michael Han <lhan@twitter.com>
Author: Michael Han <hanm@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Fangmin Lyu <fangmin@apache.org>

Closes #1007 from hanm/twitter/2765eb0629d2f63f07d112270b582e8e931f734f
Michael Han 5 years ago
parent
commit
b5817fbb12

+ 20 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -1494,6 +1494,26 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
     minute. This prevents herding during container deletion.
     Default is "10000".
 
+<a name="sc_debug_observability_config"></a>
+
+#### Debug Observability Configurations
+
+**New in 3.6.0:** The following options are introduced to make zookeeper easier to debug.
+
+* *zookeeper.messageTracker.BufferSize* :
+    (Java system property only)
+    Controls the maximum number of messages stored in **MessageTracker**. Value should be positive
+    integers. The default value is 10. **MessageTracker** is introduced in **3.6.0** to record the
+    last set of messages between a server (follower or observer) and a leader, when a server
+    disconnects with leader. These set of messages will then be dumped to zookeeper's log file,
+    and will help reconstruct the state of the servers at the time of the disconnection and
+    will be useful for debugging purpose.
+
+* *zookeeper.messageTracker.Enabled* :
+    (Java system property only)
+    When set to "true", will enable **MessageTracker** to track and record messages. Default value
+    is "false".
+
 <a name="sc_adminserver_config"></a>
 
 #### AdminServer configuration

+ 14 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

@@ -74,11 +74,16 @@ public class Follower extends Learner {
         self.start_fle = 0;
         self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
+
+        long connectionTime = 0;
+        boolean completedSync = false;
+
         try {
             self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             QuorumServer leaderServer = findLeader();
             try {
                 connectToLeader(leaderServer.addr, leaderServer.hostname);
+                connectionTime = System.currentTimeMillis();
                 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                 if (self.isReconfigStateChange()) {
                     throw new Exception("learned about role change");
@@ -99,6 +104,7 @@ public class Follower extends Learner {
                     self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                     syncWithLeader(newEpochZxid);
                     self.setZabState(QuorumPeer.ZabState.BROADCAST);
+                    completedSync = true;
                 } finally {
                     long syncTime = Time.currentElapsedTime() - startTime;
                     ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
@@ -129,6 +135,14 @@ public class Follower extends Learner {
                 om.stop();
             }
             zk.unregisterJMX(this);
+
+            if (connectionTime != 0) {
+                long connectionDuration = System.currentTimeMillis() - connectionTime;
+                LOG.info("Disconnected from leader (with address: {}). "
+                        + "Was connected for {}ms. Sync state: {}",
+                    leaderAddr, connectionDuration, completedSync);
+                messageTracker.dumpToLog(leaderAddr.toString());
+            }
         }
     }
 

+ 8 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -47,6 +47,7 @@ import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.MessageTracker;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
@@ -74,6 +75,7 @@ public class Learner {
     protected BufferedOutputStream bufferedOutput;
 
     protected Socket sock;
+    protected InetSocketAddress leaderAddr;
 
     /**
      * Socket getter
@@ -88,6 +90,9 @@ public class Learner {
     /** the protocol version of the leader */
     protected int leaderProtocolVersion = 0x01;
 
+    private static final int BUFFERED_MESSAGE_SIZE = 10;
+    protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
     protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
 
     /**
@@ -146,6 +151,7 @@ public class Learner {
     void writePacket(QuorumPacket pp, boolean flush) throws IOException {
         synchronized (leaderOs) {
             if (pp != null) {
+                messageTracker.trackSent(pp.getType());
                 leaderOs.writeRecord(pp, "packet");
             }
             if (flush) {
@@ -164,6 +170,7 @@ public class Learner {
     void readPacket(QuorumPacket pp) throws IOException {
         synchronized (leaderIs) {
             leaderIs.readRecord(pp, "packet");
+            messageTracker.trackReceived(pp.getType());
         }
         long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
         if (pp.getType() == Leader.PING) {
@@ -250,6 +257,7 @@ public class Learner {
      */
     protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
         this.sock = createSocket();
+        this.leaderAddr = addr;
 
         // leader connection timeout defaults to tickTime * initLimit
         int connectTimeout = self.tickTime * self.initLimit;

+ 18 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -49,6 +49,7 @@ import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.apache.zookeeper.server.util.MessageTracker;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
@@ -220,6 +221,8 @@ public class LearnerHandler extends ZooKeeperThread {
     private final BufferedInputStream bufferedInput;
     private BufferedOutputStream bufferedOutput;
 
+    protected final MessageTracker messageTracker;
+
     // for test only
     protected void setOutputArchive(BinaryOutputArchive oa) {
         this.oa = oa;
@@ -280,6 +283,8 @@ public class LearnerHandler extends ZooKeeperThread {
             }
             throw new SaslException("Authentication failure: " + e.getMessage());
         }
+
+        this.messageTracker = new MessageTracker(MessageTracker.BUFFERED_MESSAGE_SIZE);
     }
 
     @Override
@@ -349,6 +354,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 }
                 oa.writeRecord(p, "packet");
                 packetsSent.incrementAndGet();
+                messageTracker.trackSent(p.getType());
             } catch (IOException e) {
                 if (!sock.isClosed()) {
                     LOG.warn("Unexpected exception at " + this, e);
@@ -464,8 +470,11 @@ public class LearnerHandler extends ZooKeeperThread {
 
             QuorumPacket qp = new QuorumPacket();
             ia.readRecord(qp, "packet");
+
+            messageTracker.trackReceived(qp.getType());
             if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
                 LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");
+
                 return;
             }
 
@@ -526,9 +535,11 @@ public class LearnerHandler extends ZooKeeperThread {
                 ByteBuffer.wrap(ver).putInt(0x10000);
                 QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
                 oa.writeRecord(newEpochPacket, "packet");
+                messageTracker.trackSent(Leader.LEADERINFO);
                 bufferedOutput.flush();
                 QuorumPacket ackEpochPacket = new QuorumPacket();
                 ia.readRecord(ackEpochPacket, "packet");
+                messageTracker.trackReceived(ackEpochPacket.getType());
                 if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                     LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH");
                     return;
@@ -554,6 +565,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 try {
                     long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                     oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
+                    messageTracker.trackSent(Leader.SNAP);
                     bufferedOutput.flush();
 
                     LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
@@ -600,6 +612,8 @@ public class LearnerHandler extends ZooKeeperThread {
              */
             qp = new QuorumPacket();
             ia.readRecord(qp, "packet");
+
+            messageTracker.trackReceived(qp.getType());
             if (qp.getType() != Leader.ACK) {
                 LOG.error("Next packet was supposed to be an ACK," + " but received packet: {}", packetToString(qp));
                 return;
@@ -632,6 +646,7 @@ public class LearnerHandler extends ZooKeeperThread {
             while (true) {
                 qp = new QuorumPacket();
                 ia.readRecord(qp, "packet");
+                messageTracker.trackReceived(qp.getType());
 
                 long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                 if (qp.getType() == Leader.PING) {
@@ -716,7 +731,9 @@ public class LearnerHandler extends ZooKeeperThread {
                 syncThrottler.endSync();
                 syncThrottler = null;
             }
-            LOG.warn("******* GOODBYE {} ********", getRemoteAddress());
+            String remoteAddr = getRemoteAddress();
+            LOG.warn("******* GOODBYE {} ********", remoteAddr);
+            messageTracker.dumpToLog(remoteAddr);
             shutdown();
         }
     }

+ 12 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java

@@ -97,12 +97,14 @@ public class Observer extends Learner {
      */
     void observeLeader() throws Exception {
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
-
+        long connectTime = 0;
+        boolean completedSync = false;
         try {
             self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             QuorumServer master = findLearnerMaster();
             try {
                 connectToLeader(master.addr, master.hostname);
+                connectTime = System.currentTimeMillis();
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                 if (self.isReconfigStateChange()) {
                     throw new Exception("learned about role change");
@@ -112,6 +114,7 @@ public class Observer extends Learner {
                 self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                 syncWithLeader(newLeaderZxid);
                 self.setZabState(QuorumPeer.ZabState.BROADCAST);
+                completedSync = true;
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning() && nextLearnerMaster.get() == null) {
                     readPacket(qp);
@@ -127,6 +130,14 @@ public class Observer extends Learner {
         } finally {
             currentLearnerMaster = null;
             zk.unregisterJMX(this);
+            if (connectTime != 0) {
+                long connectionDuration = System.currentTimeMillis() - connectTime;
+
+                LOG.info("Disconnected from leader (with address: {}). "
+                        + "Was connected for {}ms. Sync state: {}",
+                    leaderAddr, connectionDuration, completedSync);
+                messageTracker.dumpToLog(leaderAddr.toString());
+            }
         }
     }
 

+ 103 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/util/CircularBuffer.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.lang.reflect.Array;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Thread safe FIFO CircularBuffer implementation.
+ * When the buffer is full write operation overwrites the oldest element.
+ *
+ * Fun thing @todo, make this lock free as this is called on every quorum message
+ */
+public class CircularBuffer<T> {
+
+    private final T[] buffer;
+    private final int capacity;
+    private int oldest;
+    private AtomicInteger numberOfElements = new AtomicInteger();
+
+    @SuppressWarnings("unchecked")
+    public CircularBuffer(Class<T> clazz, int capacity) {
+        if (capacity <= 0) {
+            throw new IllegalArgumentException("CircularBuffer capacity should be greater than 0");
+        }
+        this.buffer = (T[]) Array.newInstance(clazz, capacity);
+        this.capacity = capacity;
+    }
+
+    /**
+     * Puts elements in the next available index in the array.
+     * If the array is full the oldest element is replaced with
+     * the new value.
+     * @param element
+     */
+    public synchronized void write(T element) {
+        int newSize = numberOfElements.incrementAndGet();
+        if (newSize > capacity) {
+            buffer[oldest] = element;
+            oldest = ++oldest % capacity;
+            numberOfElements.decrementAndGet();
+        } else {
+            int index = (oldest + numberOfElements.get() - 1) % capacity;
+            buffer[index] = element;
+        }
+    }
+
+    /**
+     * Reads from the buffer in a FIFO manner.
+     * Returns the oldest element in the buffer if the buffer ie not empty
+     * Returns null if the buffer is empty
+     * @return
+     */
+    public synchronized T take() {
+        int newSize = numberOfElements.decrementAndGet();
+        if (newSize < 0) {
+            numberOfElements.incrementAndGet();
+            return null;
+        }
+        T polled = buffer[oldest];
+        oldest = ++oldest % capacity;
+        return polled;
+    }
+
+    public synchronized T peek() {
+        if (numberOfElements.get() <= 0) {
+            return null;
+        }
+        return buffer[oldest];
+    }
+
+    public int size() {
+        return numberOfElements.get();
+    }
+
+    public boolean isEmpty() {
+        return numberOfElements.get() <= 0;
+    }
+
+    public boolean isFull() {
+        return numberOfElements.get() >= capacity;
+    }
+
+    public synchronized void  reset() {
+        numberOfElements.set(0);
+    }
+}

+ 165 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/util/MessageTracker.java

@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides a way of buffering sentBuffer and receivedBuffer messages in order.
+ * It uses EvictingQueue of size BUFFERED_MESSAGE_SIZE to store the messages.
+ * When the queue is full it overrides the oldest in a circular manner.
+ * This class does doe not provide thread safety.
+ */
+public class MessageTracker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
+
+    private final CircularBuffer<BufferedMessage> sentBuffer;
+    private final CircularBuffer<BufferedMessage> receivedBuffer;
+
+    public static final String MESSAGE_TRACKER_BUFFER_SIZE = "zookeeper.messageTracker.BufferSize";
+    public static final String MESSAGE_TRACKER_ENABLED = "zookeeper.messageTracker.Enabled";
+    public static final int BUFFERED_MESSAGE_SIZE;
+    private static final boolean enabled;
+    static {
+        BUFFERED_MESSAGE_SIZE = Integer.getInteger(MESSAGE_TRACKER_BUFFER_SIZE, 10);
+        enabled = Boolean.getBoolean(MESSAGE_TRACKER_ENABLED);
+    }
+
+    public MessageTracker(int buffer_size) {
+        this.sentBuffer = new CircularBuffer<>(BufferedMessage.class, buffer_size);
+        this.receivedBuffer = new CircularBuffer<>(BufferedMessage.class, buffer_size);
+    }
+
+    public void trackSent(long timestamp) {
+        if (enabled) {
+            sentBuffer.write(new BufferedMessage(timestamp));
+        }
+    }
+
+    public void trackSent(int packetType) {
+        if (enabled) {
+            sentBuffer.write(new BufferedMessage(packetType));
+        }
+    }
+
+    public void trackReceived(long timestamp) {
+        if (enabled) {
+            receivedBuffer.write(new BufferedMessage(timestamp));
+        }
+    }
+
+    public void trackReceived(int packetType) {
+        if (enabled) {
+            receivedBuffer.write(new BufferedMessage(packetType));
+        }
+    }
+
+    public final BufferedMessage peekSent() {
+        return sentBuffer.peek();
+    }
+
+    public final BufferedMessage peekReceived() {
+        return receivedBuffer.peek();
+    }
+
+    public final long peekSentTimestamp() {
+        return enabled ? sentBuffer.peek().getTimestamp() : 0;
+    }
+
+    public final long peekReceivedTimestamp() {
+        return enabled ? receivedBuffer.peek().getTimestamp() : 0;
+    }
+
+    public void dumpToLog(String serverAddress) {
+        if (!enabled) {
+            return;
+        }
+        logMessages(serverAddress, receivedBuffer, Direction.RECEIVED);
+        logMessages(serverAddress, sentBuffer, Direction.SENT);
+    }
+
+    private static void logMessages(
+        String serverAddr,
+        CircularBuffer<BufferedMessage> messages,
+        Direction direction) {
+        String sentOrReceivedText = direction == Direction.SENT ? "sentBuffer to" : "receivedBuffer from";
+
+        if (messages.isEmpty()) {
+            LOG.info("No buffered timestamps for messages {} {}", sentOrReceivedText, serverAddr);
+        } else {
+            LOG.warn("Last {} timestamps for messages {} {}:",
+                messages.size(), sentOrReceivedText, serverAddr);
+            while (!messages.isEmpty()) {
+                LOG.warn("{} {}  {}",
+                    sentOrReceivedText,
+                    serverAddr,
+                    messages.take().toString());
+            }
+        }
+    }
+
+    /**
+     * Direction for message track.
+     */
+    private enum Direction {
+        SENT, RECEIVED
+    }
+
+    private static class BufferedMessage {
+
+        private long timestamp;
+        private int messageType;
+
+        private long getTimestamp() {
+            return timestamp;
+        }
+
+        BufferedMessage(int messageType) {
+            this.messageType = messageType;
+            this.timestamp = System.currentTimeMillis();
+        }
+
+        BufferedMessage(long timestamp) {
+            this.messageType = -1;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        /**
+         * ToString examples are as follows:
+         * TimeStamp: 2016-06-06 11:07:58,594 Type: PROPOSAL
+         * TimeStamp: 2016-06-06 11:07:58,187
+         */
+        public String toString() {
+            if (messageType == -1) {
+                return "TimeStamp: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS")
+                    .format(new Date(timestamp));
+            } else {
+                return "TimeStamp: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS")
+                    .format(new Date(timestamp)) + " Type: " + Leader.getPacketType(messageType);
+            }
+        }
+    }
+}

+ 198 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/util/CircularBufferTest.java

@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CircularBufferTest {
+
+    @Test
+    public void testCircularBuffer() {
+        final int capacity = 3;
+        CircularBuffer<String> buffer = new CircularBuffer<>(String.class, capacity);
+
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("A");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("B");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("C");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        // Buffer is full.
+        // Read from buffer
+        Assert.assertEquals("A", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("B", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("C", buffer.take());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("1");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("2");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("3");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("4"); // 4 overwrites 1
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        // Buffer if full
+        // Read from buffer
+        Assert.assertEquals("2", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("3", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("4", buffer.take());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("a");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("b");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("c");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("d"); // d overwrites a
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("e"); // e overwrites b
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("f"); // f overwrites c
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("g"); // g overwrites d
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        // Buffer is full.
+        // Read from buffer
+        Assert.assertEquals("e", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("f", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("g", buffer.take());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+    }
+
+    @Test
+    public void testCircularBufferWithCapacity1() {
+        final int capacity = 1;
+        CircularBuffer<String> buffer = new CircularBuffer<>(String.class, capacity);
+
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("A");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("B"); // B overwrite A
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        // Buffer is full.
+        // Read from buffer
+        Assert.assertEquals("B", buffer.take());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+    }
+
+    @Test
+    public void testCircularBufferReset() {
+        final int capacity = 3;
+        CircularBuffer<String> buffer = new CircularBuffer<>(String.class, capacity);
+
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("A");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+        Assert.assertEquals(1, buffer.size());
+        Assert.assertEquals("A", buffer.peek());
+
+        buffer.write("B");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+        Assert.assertEquals(2, buffer.size());
+        Assert.assertEquals("A", buffer.peek());
+
+        // reset
+        buffer.reset();
+        Assert.assertNull(buffer.peek());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+        Assert.assertEquals(0, buffer.size());
+    }
+
+    @Test
+    public void testCircularBufferIllegalCapacity() {
+        try {
+            CircularBuffer<String> buffer = new CircularBuffer<>(String.class, 0);
+            Assert.fail();
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals("CircularBuffer capacity should be greater than 0", e.getMessage());
+        }
+    }
+}

+ 129 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/util/MessageTrackerTest.java

@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageTrackerTest {
+    private static final int BUFFERED_MESSAGE_SIZE = 5;
+    private static final Logger LOG = LoggerFactory.getLogger(MessageTrackerTest.class);
+
+    @Before
+    public void setup() {
+        System.setProperty(MessageTracker.MESSAGE_TRACKER_ENABLED, "true");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.clearProperty(MessageTracker.MESSAGE_TRACKER_ENABLED);
+    }
+
+    @Test
+    public void testTrackSend() throws InterruptedException {
+        long timestamp1 = System.currentTimeMillis();
+        MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
+        // First timestamp is added
+        messageTracker.trackSent(timestamp1);
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1);
+
+        Thread.sleep(2);
+
+        // Second timestamp is added
+        long timestamp2 = System.currentTimeMillis();
+        messageTracker.trackSent(timestamp2);
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1);
+    }
+
+    @Test
+    public void testTrackReceived() throws InterruptedException {
+        long timestamp1 = System.currentTimeMillis();
+        MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
+        // First timestamp is added
+        messageTracker.trackReceived(timestamp1);
+        Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1);
+
+        Thread.sleep(2);
+
+        // Second timestamp is added
+        long timestamp2 = System.currentTimeMillis();
+        messageTracker.trackReceived(timestamp2);
+        Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1);
+    }
+
+    @Test
+    public void testMessageTrackerFull() throws InterruptedException {
+        MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
+        // Add up to capacity + 1
+        long timestampSent = 0;
+        long timestampReceived = 0;
+        for (int i = 0; i <= BUFFERED_MESSAGE_SIZE; i++) {
+            if (i == 1) {
+                timestampSent = System.currentTimeMillis();
+                messageTracker.trackSent(timestampSent);
+                Thread.sleep(2);
+                timestampReceived = System.currentTimeMillis();
+                messageTracker.trackReceived(timestampReceived);
+            } else {
+                messageTracker.trackSent(System.currentTimeMillis());
+                messageTracker.trackReceived(System.currentTimeMillis());
+            }
+            Thread.sleep(1);
+        }
+
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestampSent);
+        Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestampReceived);
+    }
+
+    @Test
+    public void testDumpToLog() {
+        long timestamp1 = System.currentTimeMillis();
+        MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+        String sid = "127.0.0.1";
+
+        // MessageTracker is empty
+        messageTracker.dumpToLog(sid);
+        Assert.assertNull(messageTracker.peekSent());
+        Assert.assertNull(messageTracker.peekReceived());
+
+        // There is 1 sent and 0 received
+        messageTracker.trackSent(timestamp1);
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1);
+        Assert.assertNull(messageTracker.peekReceived());
+        messageTracker.dumpToLog(sid);
+        Assert.assertNull(messageTracker.peekSent());
+        Assert.assertNull(messageTracker.peekReceived());
+
+        // There is 1 sent and 1 received
+        messageTracker.trackSent(timestamp1);
+        messageTracker.trackReceived(timestamp1);
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1);
+        Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1);
+        messageTracker.dumpToLog(sid);
+        Assert.assertNull(messageTracker.peekSent());
+        Assert.assertNull(messageTracker.peekReceived());
+    }
+}