Browse Source

ZOOKEEPER-3274: Use CompositeByteBuf to queue data in NettyServerCnxn

This avoids unnecessary buffer copies and resizes.

Author: Ilya Maykov <ilyam@fb.com>

Reviewers: andor@apache.org

Closes #810 from ivmaykov/ZOOKEEPER-3274
Ilya Maykov 6 years ago
parent
commit
5375c25429

+ 27 - 8
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
 public class NettyServerCnxn extends ServerCnxn {
     private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class);
     private final Channel channel;
-    private ByteBuf queuedBuffer;
+    private CompositeByteBuf queuedBuffer;
     private final AtomicBoolean throttled = new AtomicBoolean(false);
     private ByteBuffer bb;
     private final ByteBuffer bbLen = ByteBuffer.allocate(4);
@@ -291,6 +292,24 @@ public class NettyServerCnxn extends ServerCnxn {
         }
     }
 
+    /**
+     * Appends <code>buf</code> to <code>queuedBuffer</code>. Does not duplicate <code>buf</code>
+     * or call any flavor of {@link ByteBuf#retain()}. Caller must ensure that <code>buf</code>
+     * is not owned by anyone else, as this call transfers ownership of <code>buf</code> to the
+     * <code>queuedBuffer</code>.
+     *
+     * This method should only be called from the event loop thread.
+     * @param buf the buffer to append to the queue.
+     */
+    private void appendToQueuedBuffer(ByteBuf buf) {
+        checkIsInEventLoop("appendToQueuedBuffer");
+        if (queuedBuffer.numComponents() == queuedBuffer.maxNumComponents()) {
+            // queuedBuffer has reached its component limit, so combine the existing components.
+            queuedBuffer.consolidate();
+        }
+        queuedBuffer.addComponent(true, buf);
+    }
+
     /**
      * Process incoming message. This should only be called from the event
      * loop thread.
@@ -318,9 +337,9 @@ public class NettyServerCnxn extends ServerCnxn {
             // we are throttled, so we need to queue
             if (queuedBuffer == null) {
                 LOG.debug("allocating queue");
-                queuedBuffer = channel.alloc().buffer(buf.readableBytes());
+                queuedBuffer = channel.alloc().compositeBuffer();
             }
-            queuedBuffer.writeBytes(buf);
+            appendToQueuedBuffer(buf.retainedDuplicate());
             if (LOG.isTraceEnabled()) {
                 LOG.trace("0x{} queuedBuffer {}",
                         Long.toHexString(sessionId),
@@ -329,7 +348,7 @@ public class NettyServerCnxn extends ServerCnxn {
         } else {
             LOG.debug("not throttled");
             if (queuedBuffer != null) {
-                queuedBuffer.writeBytes(buf);
+                appendToQueuedBuffer(buf.retainedDuplicate());
                 processQueuedBuffer();
             } else {
                 receiveMessage(buf);
@@ -340,9 +359,9 @@ public class NettyServerCnxn extends ServerCnxn {
                         LOG.trace("Before copy {}", buf);
                     }
                     if (queuedBuffer == null) {
-                        queuedBuffer = channel.alloc().buffer(buf.readableBytes());
+                        queuedBuffer = channel.alloc().compositeBuffer();
                     }
-                    queuedBuffer.writeBytes(buf);
+                    appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Copy is {}", queuedBuffer);
                         LOG.trace("0x{} queuedBuffer {}",
@@ -375,9 +394,9 @@ public class NettyServerCnxn extends ServerCnxn {
                 releaseQueuedBuffer();
             } else {
                 LOG.debug("Processed queue - bytes remaining");
-                // Possibly reduce memory consumption by freeing up buffer space
+                // Try to reduce memory consumption by freeing up buffer space
                 // which is no longer needed.
-                queuedBuffer.discardSomeReadBytes();
+                queuedBuffer.discardReadComponents();
             }
         } else {
             LOG.debug("queue empty");