浏览代码

ZOOKEEPER-3272: Clean up netty4 code per Norman Maurer's review comments

Netty4 code clean-up per the discussion in PR #753.

Author: Ilya Maykov <ilyam@fb.com>

Reviewers: norman_maurer@apple.com, andor@apache.org

Closes #809 from ivmaykov/ZOOKEEPER-3272 and squashes the following commits:

26d2b338f [Ilya Maykov] Merge branch 'master' into ZOOKEEPER-3272
201449641 [Ilya Maykov] Address code review comments
a162c54b0 [Ilya Maykov] ZOOKEEPER-3272: Clean up netty4 code per Norman Maurer's review comments
Ilya Maykov 6 年之前
父节点
当前提交
c2b63926c1

+ 28 - 15
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java

@@ -49,6 +49,8 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
 import org.apache.zookeeper.ClientCnxn.Packet;
 import org.apache.zookeeper.client.ZKClientConfig;
@@ -82,7 +84,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
 
     ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
         this.clientConfig = clientConfig;
-        eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup();
+        // Client only has 1 outgoing socket, so the event loop group only needs
+        // a single thread.
+        eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1 /* nThreads */);
         initProperties();
     }
 
@@ -143,6 +147,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
                 @Override
                 public void operationComplete(ChannelFuture channelFuture) throws Exception {
                     // this lock guarantees that channel won't be assigned after cleanup().
+                    boolean connected = false;
                     connectLock.lock();
                     try {
                         if (!channelFuture.isSuccess()) {
@@ -175,10 +180,13 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
                         } else {
                             needSasl.set(false);
                         }
-                        LOG.info("channel is connected: {}", channelFuture.channel());
+                        connected = true;
                     } finally {
                         connectFuture = null;
                         connectLock.unlock();
+                        if (connected) {
+                            LOG.info("channel is connected: {}", channelFuture.channel());
+                        }
                         // need to wake on connect success or failure to avoid
                         // timing out ClientCnxn.SendThread which may be
                         // blocked waiting for first connect in doTransport().
@@ -218,9 +226,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
 
     @Override
     void close() {
-        if (!eventLoopGroup.isShuttingDown()) {
-            eventLoopGroup.shutdownGracefully();
-        }
+        eventLoopGroup.shutdownGracefully();
     }
 
     @Override
@@ -318,20 +324,23 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         return sendPkt(p, false);
     }
 
+    // Use a single listener instance to reduce GC
+    private final GenericFutureListener<Future<Void>> onSendPktDoneListener = f -> {
+        if (f.isSuccess()) {
+            sentCount.getAndIncrement();
+        }
+    };
+
     private ChannelFuture sendPkt(Packet p, boolean doFlush) {
         // Assuming the packet will be sent out successfully. Because if it fails,
         // the channel will close and clean up queues.
         p.createBB();
         updateLastSend();
-        ChannelFuture result = channel.write(Unpooled.wrappedBuffer(p.bb));
-        result.addListener(f -> {
-            if (f.isSuccess()) {
-                sentCount.getAndIncrement();
-            }
-        });
-        if (doFlush) {
-            channel.flush();
-        }
+        final ByteBuf writeBuffer = Unpooled.wrappedBuffer(p.bb);
+        final ChannelFuture result = doFlush
+                ? channel.writeAndFlush(writeBuffer)
+                : channel.write(writeBuffer);
+        result.addListener(onSendPktDoneListener);
         return result;
     }
 
@@ -345,6 +354,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
      */
     private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
         updateNow();
+        boolean anyPacketsSent = false;
         while (true) {
             if (p != WakeupPacket.getInstance()) {
                 if ((p.requestHeader != null) &&
@@ -356,6 +366,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
                     }
                 }
                 sendPktOnly(p);
+                anyPacketsSent = true;
             }
             if (outgoingQueue.isEmpty()) {
                 break;
@@ -364,7 +375,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         }
         // TODO: maybe we should flush in the loop above every N packets/bytes?
         // But, how do we determine the right value for N ...
-        channel.flush();
+        if (anyPacketsSent) {
+            channel.flush();
+        }
     }
 
     @Override

+ 89 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java

@@ -18,6 +18,15 @@
 
 package org.apache.zookeeper.common;
 
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -28,15 +37,22 @@ import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Helper methods for netty code.
  */
 public class NettyUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyUtils.class);
+
+    private static final int DEFAULT_INET_ADDRESS_COUNT = 1;
+
     /**
      * If {@link Epoll#isAvailable()} <code>== true</code>, returns a new
      * {@link EpollEventLoopGroup}, otherwise returns a new
-     * {@link NioEventLoopGroup}.
+     * {@link NioEventLoopGroup}. Creates the event loop group using the
+     * default number of threads.
      * @return a new {@link EventLoopGroup}.
      */
     public static EventLoopGroup newNioOrEpollEventLoopGroup() {
@@ -47,6 +63,22 @@ public class NettyUtils {
         }
     }
 
+    /**
+     * If {@link Epoll#isAvailable()} <code>== true</code>, returns a new
+     * {@link EpollEventLoopGroup}, otherwise returns a new
+     * {@link NioEventLoopGroup}. Creates the event loop group using the
+     * specified number of threads instead of the default.
+     * @param nThreads see {@link NioEventLoopGroup#NioEventLoopGroup(int)}.
+     * @return a new {@link EventLoopGroup}.
+     */
+    public static EventLoopGroup newNioOrEpollEventLoopGroup(int nThreads) {
+        if (Epoll.isAvailable()) {
+            return new EpollEventLoopGroup(nThreads);
+        } else {
+            return new NioEventLoopGroup(nThreads);
+        }
+    }
+
     /**
      * If {@link Epoll#isAvailable()} <code>== true</code>, returns
      * {@link EpollSocketChannel}, otherwise returns {@link NioSocketChannel}.
@@ -73,4 +105,60 @@ public class NettyUtils {
             return NioServerSocketChannel.class;
         }
     }
+
+    /**
+     * Attempts to detect and return the number of local network addresses that could be
+     * used by a client to reach this server. This means we exclude the following address types:
+     * <ul>
+     *     <li>Multicast addresses. Zookeeper server sockets use TCP, thus cannot bind to a multicast address.</li>
+     *     <li>Link-local addresses. Routers don't forward traffic sent to a link-local address, so
+     *     any realistic server deployment would not have clients using these.</li>
+     *     <li>Loopback addresses. These are typically only used for testing.</li>
+     * </ul>
+     * Any remaining addresses are counted, and the total count is returned. This number is
+     * used to configure the number of threads for the "boss" event loop group, to make sure we have
+     * enough threads for each address in case the server is configured to listen on
+     * all available addresses.
+     * If listing the network interfaces fails, this method will return 1.
+     *
+     * @return the number of client-reachable local network addresses found, or
+     * 1 if listing the network interfaces fails.
+     */
+    public static int getClientReachableLocalInetAddressCount() {
+        try {
+            Set<InetAddress> validInetAddresses = new HashSet<>();
+            Enumeration<NetworkInterface> allNetworkInterfaces = NetworkInterface.getNetworkInterfaces();
+            for (NetworkInterface networkInterface : Collections.list(allNetworkInterfaces)) {
+                for (InetAddress inetAddress : Collections.list(networkInterface.getInetAddresses())) {
+                    if (inetAddress.isLinkLocalAddress()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Ignoring link-local InetAddress {}", inetAddress);
+                        }
+                        continue;
+                    }
+                    if (inetAddress.isMulticastAddress()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Ignoring multicast InetAddress {}", inetAddress);
+                        }
+                        continue;
+                    }
+                    if (inetAddress.isLoopbackAddress()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Ignoring loopback InetAddress {}", inetAddress);
+                        }
+                        continue;
+                    }
+                    validInetAddresses.add(inetAddress);
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Detected {} local network addresses", validInetAddresses.size());
+                LOG.debug("Resolved local addresses are: {}", Arrays.toString(validInetAddresses.toArray()));
+            }
+            return validInetAddresses.size() > 0 ? validInetAddresses.size() : DEFAULT_INET_ADDRESS_COUNT;
+        } catch (SocketException ex) {
+            LOG.warn("Failed to list all network interfaces, assuming 1", ex);
+            return DEFAULT_INET_ADDRESS_COUNT;
+        }
+    }
 }

+ 36 - 12
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -28,7 +28,6 @@ import java.nio.channels.SelectionKey;
 import java.security.cert.Certificate;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
@@ -36,7 +35,8 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.data.Stat;
@@ -178,17 +178,20 @@ public class NettyServerCnxn extends ServerCnxn {
         factory.addSession(sessionId, this);
     }
 
+    // Use a single listener instance to reduce GC
+    private final GenericFutureListener<Future<Void>> onSendBufferDoneListener = f -> {
+        if (f.isSuccess()) {
+            packetSent();
+        }
+    };
+
     @Override
     public void sendBuffer(ByteBuffer... buffers) {
         if (buffers.length == 1 && buffers[0] == ServerCnxnFactory.closeConn) {
             close();
             return;
         }
-        channel.writeAndFlush(Unpooled.wrappedBuffer(buffers)).addListener(f -> {
-            if (f.isSuccess()) {
-                packetSent();
-            }
-        });
+        channel.writeAndFlush(Unpooled.wrappedBuffer(buffers)).addListener(onSendBufferDoneListener);
     }
 
     /**
@@ -276,13 +279,28 @@ public class NettyServerCnxn extends ServerCnxn {
         }
     }
 
+    /**
+     * Helper that throws an IllegalStateException if the current thread is not
+     * executing in the channel's event loop thread.
+     * @param callerMethodName the name of the calling method to add to the exception message.
+     */
+    private void checkIsInEventLoop(String callerMethodName) {
+        if (!channel.eventLoop().inEventLoop()) {
+            throw new IllegalStateException(
+                    callerMethodName + "() called from non-EventLoop thread");
+        }
+    }
+
     /**
      * Process incoming message. This should only be called from the event
      * loop thread.
+     * Note that this method does not call <code>buf.release()</code>. The caller
+     * is responsible for making sure the buf is released after this method
+     * returns.
      * @param buf the message bytes to process.
      */
     void processMessage(ByteBuf buf) {
-        assert channel.eventLoop().inEventLoop();
+        checkIsInEventLoop("processMessage");
         if (LOG.isDebugEnabled()) {
             LOG.debug("0x{} queuedBuffer: {}",
                     Long.toHexString(sessionId),
@@ -341,7 +359,7 @@ public class NettyServerCnxn extends ServerCnxn {
      * from the event loop thread.
      */
     void processQueuedBuffer() {
-        assert channel.eventLoop().inEventLoop();
+        checkIsInEventLoop("processQueuedBuffer");
         if (queuedBuffer != null) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("processing queue 0x{} queuedBuffer {}",
@@ -357,6 +375,9 @@ public class NettyServerCnxn extends ServerCnxn {
                 releaseQueuedBuffer();
             } else {
                 LOG.debug("Processed queue - bytes remaining");
+                // Possibly reduce memory consumption by freeing up buffer space
+                // which is no longer needed.
+                queuedBuffer.discardSomeReadBytes();
             }
         } else {
             LOG.debug("queue empty");
@@ -368,9 +389,9 @@ public class NettyServerCnxn extends ServerCnxn {
      * called from the event loop thread.
      */
     private void releaseQueuedBuffer() {
-        assert channel.eventLoop().inEventLoop();
+        checkIsInEventLoop("releaseQueuedBuffer");
         if (queuedBuffer != null) {
-            ReferenceCountUtil.release(queuedBuffer);
+            queuedBuffer.release();
             queuedBuffer = null;
         }
     }
@@ -379,10 +400,13 @@ public class NettyServerCnxn extends ServerCnxn {
      * Receive a message, which can come from the queued buffer or from a new
      * buffer coming in over the channel. This should only be called from the
      * event loop thread.
+     * Note that this method does not call <code>message.release()</code>. The
+     * caller is responsible for making sure the message is released after this
+     * method returns.
      * @param message the message bytes to process.
      */
     private void receiveMessage(ByteBuf message) {
-        assert channel.eventLoop().inEventLoop();
+        checkIsInEventLoop("receiveMessage");
         try {
             while(message.isReadable() && !throttled.get()) {
                 if (bb != null) {

+ 16 - 12
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java

@@ -203,14 +203,16 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
             }
         }
 
-        @Override
-        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        // Use a single listener instance to reduce GC
+        private final GenericFutureListener<Future<Void>> onWriteCompletedListener = (f) -> {
             if (LOG.isTraceEnabled()) {
-                promise.addListener((future) -> {
-                    LOG.trace("write {}",
-                            future.isSuccess() ? "complete" : "failed");
-                });
+                LOG.trace("write {}", f.isSuccess() ? "complete" : "failed");
             }
+        };
+
+        @Override
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+            promise.addListener(onWriteCompletedListener);
             super.write(ctx, msg, promise);
         }
 
@@ -285,7 +287,8 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     NettyServerCnxnFactory() {
         x509Util = new ClientX509Util();
 
-        EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup();
+        EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(
+                NettyUtils.getClientReachableLocalInetAddressCount());
         EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
         ServerBootstrap bootstrap = new ServerBootstrap()
                 .group(bossGroup, workerGroup)
@@ -519,13 +522,14 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
                 if (s.isEmpty()) {
                     ipMap.remove(remoteAddress);
                 }
-            } else {
-                LOG.error(
-                        "Unexpected null set for remote address {} when removing cnxn {}",
-                        remoteAddress,
-                        cnxn);
+                return;
             }
         }
+        // Fallthrough and log errors outside the synchronized block
+        LOG.error(
+                "Unexpected null set for remote address {} when removing cnxn {}",
+                remoteAddress,
+                cnxn);
     }
 
     private int getClientCnxnCount(InetAddress addr) {

+ 7 - 6
zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java

@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -81,7 +82,7 @@ public class NettyServerCnxnTest extends ClientBase {
         final String path = "/a";
         try {
             // make sure zkclient works
-            zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
+            zk.create(path, "test".getBytes(StandardCharsets.UTF_8), Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
             // set on watch
             Assert.assertNotNull("Didn't create znode:" + path,
@@ -116,14 +117,14 @@ public class NettyServerCnxnTest extends ClientBase {
             assertThat("Last client response size should be initialized with INIT_VALUE",
                     clientResponseStats.getLastBufferSize(), equalTo(BufferStats.INIT_VALUE));
 
-            zk.create("/a", "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
+            zk.create("/a", "test".getBytes(StandardCharsets.UTF_8), Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
 
             assertThat("Last client response size should be greater than 0 after client request was performed",
                     clientResponseStats.getLastBufferSize(), greaterThan(0));
 
             byte[] contents = zk.getData("/a", null, null);
-            assertArrayEquals("unexpected data", "test".getBytes(), contents);
+            assertArrayEquals("unexpected data", "test".getBytes(StandardCharsets.UTF_8), contents);
         }
     }
 
@@ -134,7 +135,7 @@ public class NettyServerCnxnTest extends ClientBase {
             assertThat("Last client response size should be initialized with INIT_VALUE",
                     clientResponseStats.getLastBufferSize(), equalTo(BufferStats.INIT_VALUE));
 
-            zk.create("/a", "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
+            zk.create("/a", "test".getBytes(StandardCharsets.UTF_8), Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
 
             assertThat("Last client response size should be greater than 0 after client request was performed",
@@ -162,7 +163,7 @@ public class NettyServerCnxnTest extends ClientBase {
             }
 
             byte[] contents = zk.getData("/a", null, null);
-            assertArrayEquals("unexpected data", "test".getBytes(), contents);
+            assertArrayEquals("unexpected data", "test".getBytes(StandardCharsets.UTF_8), contents);
 
             // As above, but don't do the throttled read. Make the request bytes wait in the socket
             // input buffer until after throttling is turned off. Need to make sure both modes work.
@@ -180,7 +181,7 @@ public class NettyServerCnxnTest extends ClientBase {
             }
 
             contents = zk.getData("/a", null, null);
-            assertArrayEquals("unexpected data", "test".getBytes(), contents);
+            assertArrayEquals("unexpected data", "test".getBytes(StandardCharsets.UTF_8), contents);
         }
     }
 }