瀏覽代碼

ZOOKEEPER-3356: Implement advanced Netty flow control based on feedback from ZK

The current implementation of enable/disable recv logic may cause the direct buffer OOM because we may enable read a large chunk and disabled again after consume a single ZK request.

This implementation disabled AUTO_READ and controls the READ depends on whether the SslHandler has issued a READ and what's the queuedBuffer status.

With this implementation, the max Netty queued buffer size (direct memory usage) will be 2 * recv_buffer size. It's not the per message size because in EPoll ET mode it will try to read until the socket is empty, and because of SslHandler will trigger another read when it's not a full encrypt packet and haven't issued any decrypt message.

Author: Fangmin Lyu <fangmin@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Andor Molnar <andor@apache.org>

Closes #919 from lvfangmin/ZOOKEEPER-3356
Fangmin Lyu 5 年之前
父節點
當前提交
ce33b7faed

+ 6 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -887,6 +887,12 @@ property, when available, is noted below.
     **New in 3.6.0:**
     The time (in milliseconds) the RequestThrottler waits for the request queue to drain during shutdown before it shuts down forcefully. The default is 10000.  
 
+* *advancedFlowControlEnabled* :
+    (Java system property: **zookeeper.netty.advancedFlowControl.enabled**)
+    Using accurate flow control in netty based on the status of ZooKeeper 
+    pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in
+    Netty.
+
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options

+ 15 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -68,6 +68,8 @@ public class NettyServerCnxn extends ServerCnxn {
     private final NettyServerCnxnFactory factory;
     private boolean initialized;
 
+    public int readIssuedAfterReadComplete;
+
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
         super(zks);
         this.channel = channel;
@@ -321,6 +323,7 @@ public class NettyServerCnxn extends ServerCnxn {
             queuedBuffer.consolidate();
         }
         queuedBuffer.addComponent(true, buf);
+        ServerMetrics.getMetrics().NETTY_QUEUED_BUFFER.add(queuedBuffer.capacity());
     }
 
     /**
@@ -553,11 +556,11 @@ public class NettyServerCnxn extends ServerCnxn {
     }
 
     /**
-     * An event that triggers a change in the channel's "Auto Read" setting.
+     * An event that triggers a change in the channel's read setting.
      * Used for throttling. By using an enum we can treat the two values as
      * singletons and compare with ==.
      */
-    enum AutoReadEvent {
+    enum ReadEvent {
         DISABLE,
         ENABLE
     }
@@ -573,7 +576,7 @@ public class NettyServerCnxn extends ServerCnxn {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Throttling - disabling recv {}", this);
             }
-            channel.pipeline().fireUserEventTriggered(AutoReadEvent.DISABLE);
+            channel.pipeline().fireUserEventTriggered(ReadEvent.DISABLE);
         }
     }
 
@@ -583,7 +586,7 @@ public class NettyServerCnxn extends ServerCnxn {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Sending unthrottle event {}", this);
             }
-            channel.pipeline().fireUserEventTriggered(AutoReadEvent.ENABLE);
+            channel.pipeline().fireUserEventTriggered(ReadEvent.ENABLE);
         }
     }
 
@@ -659,4 +662,12 @@ public class NettyServerCnxn extends ServerCnxn {
     Channel getChannel() {
         return channel;
     }
+
+    public int getQueuedReadableBytes() {
+        checkIsInEventLoop("getQueuedReadableBytes");
+        if (queuedBuffer != null) {
+            return queuedBuffer.readableBytes();
+        }
+        return 0;
+    }
 }

+ 80 - 6
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java

@@ -104,6 +104,9 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     int listenBacklog = -1;
     private final ClientX509Util x509Util;
 
+    public static final String NETTY_ADVANCED_FLOW_CONTROL = "zookeeper.netty.advancedFlowControl.enabled";
+    private boolean advancedFlowControlEnabled = false;
+
     private static final AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE =
             AttributeKey.valueOf("NettyServerCnxn");
 
@@ -239,18 +242,31 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         @Override
         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
             try {
-                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
-                    LOG.debug("Received AutoReadEvent.ENABLE");
+                if (evt == NettyServerCnxn.ReadEvent.ENABLE) {
+                    LOG.debug("Received ReadEvent.ENABLE");
                     NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                     // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
                     // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
                     // after either of those. Check for null just to be safe ...
                     if (cnxn != null) {
-                        cnxn.processQueuedBuffer();
+                        if (cnxn.getQueuedReadableBytes() > 0) {
+                            cnxn.processQueuedBuffer();
+                            if (advancedFlowControlEnabled &&
+                                    cnxn.getQueuedReadableBytes() == 0) {
+                                // trigger a read if we have consumed all
+                                // backlog
+                                ctx.read();
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Issued a read after queuedBuffer drained");
+                                }
+                            }
+                        }
+                    }
+                    if (!advancedFlowControlEnabled) {
+                        ctx.channel().config().setAutoRead(true);
                     }
-                    ctx.channel().config().setAutoRead(true);
-                } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
-                    LOG.debug("Received AutoReadEvent.DISABLE");
+                } else if (evt == NettyServerCnxn.ReadEvent.DISABLE) {
+                    LOG.debug("Received ReadEvent.DISABLE");
                     ctx.channel().config().setAutoRead(false);
                 }
             } finally {
@@ -283,6 +299,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
             }
         }
 
+        @Override
+        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+            if (advancedFlowControlEnabled) {
+                NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+                if (cnxn != null && cnxn.getQueuedReadableBytes() == 0 &&
+                        cnxn.readIssuedAfterReadComplete == 0) {
+                    ctx.read();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Issued a read since we don't have " +
+                                "anything to consume after channelReadComplete");
+                    }
+                }
+            }
+
+            ctx.fireChannelReadComplete();
+        }
+
         // Use a single listener instance to reduce GC
         // Note: this listener is only added when LOG.isTraceEnabled() is true,
         // so it should not do any work other than trace logging.
@@ -375,8 +408,33 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
             }
         }
     }
+
+    @Sharable
+    static class ReadIssuedTrackingHandler extends ChannelDuplexHandler {
+
+        @Override
+        public void read(ChannelHandlerContext ctx) throws Exception {
+            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+            if (cnxn != null) {
+                cnxn.readIssuedAfterReadComplete++;
+            }
+
+            ctx.read();
+        }
+
+        @Override
+        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+            if (cnxn != null) {
+                cnxn.readIssuedAfterReadComplete = 0;
+            }
+
+            ctx.fireChannelReadComplete();
+        }
+    }
     
     CnxnChannelHandler channelHandler = new CnxnChannelHandler();
+    ReadIssuedTrackingHandler readIssuedTrackingHandler = new ReadIssuedTrackingHandler();
 
     private ServerBootstrap configureBootstrapAllocator(ServerBootstrap bootstrap) {
         ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
@@ -404,6 +462,9 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         }
         this.shouldUsePortUnification = usePortUnification;
 
+        this.advancedFlowControlEnabled = Boolean.getBoolean(NETTY_ADVANCED_FLOW_CONTROL);
+        LOG.info("{} = {}", NETTY_ADVANCED_FLOW_CONTROL, this.advancedFlowControlEnabled);
+
         EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(
                 NettyUtils.getClientReachableLocalInetAddressCount());
         EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
@@ -419,6 +480,9 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
                     @Override
                     protected void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline pipeline = ch.pipeline();
+                        if (advancedFlowControlEnabled) {
+                            pipeline.addLast(readIssuedTrackingHandler);
+                        }
                         if (secure) {
                             initSSL(pipeline, false);
                         } else if (shouldUsePortUnification) {
@@ -700,4 +764,14 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     static void clearTestAllocator() {
         TEST_ALLOCATOR.set(null);
     }
+
+    // VisibleForTest
+    public void setAdvancedFlowControlEnabled(boolean advancedFlowControlEnabled) {
+        this.advancedFlowControlEnabled = advancedFlowControlEnabled;
+    }
+
+    // VisibleForTest
+    public void setSecure(boolean secure) {
+        this.secure = secure;
+    }
 }

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -225,6 +225,8 @@ public final class ServerMetrics {
         STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
         STALE_REPLIES = metricsContext.getCounter("stale_replies");
         REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
+
+        NETTY_QUEUED_BUFFER = metricsContext.getSummary("netty_queued_buffer_capacity", DetailLevel.BASIC);
     }
 
     /**
@@ -424,6 +426,8 @@ public final class ServerMetrics {
     public final Counter STALE_REPLIES;
     public final Counter REQUEST_THROTTLE_WAIT_COUNT;
 
+    public final Summary NETTY_QUEUED_BUFFER;
+
     private final MetricsProvider metricsProvider;
 
     public void resetAll() {

+ 139 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java

@@ -18,14 +18,17 @@
 
 package org.apache.zookeeper.server;
 
-
+import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.test.TestByteBufAllocator;
 import org.apache.zookeeper.server.quorum.BufferStats;
 import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.SSLAuthTest;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -34,7 +37,11 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.ProtocolException;
 import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -203,4 +210,135 @@ public class NettyServerCnxnTest extends ClientBase {
             assertArrayEquals("unexpected data", "test".getBytes(StandardCharsets.UTF_8), contents);
         }
     }
+
+    @Test
+    public void testEnableDisableThrottling_secure_random() throws Exception {
+        runEnableDisableThrottling(true, true);
+    }
+
+    @Test
+    public void testEnableDisableThrottling_secure_sequentially() throws Exception {
+        runEnableDisableThrottling(true, false);
+    }
+
+    @Test
+    public void testEnableDisableThrottling_nonSecure_random() throws Exception {
+        runEnableDisableThrottling(false, true);
+    }
+
+    @Test
+    public void testEnableDisableThrottling_nonSecure_sequentially() throws Exception {
+        runEnableDisableThrottling(false, false);
+    }
+
+    private void runEnableDisableThrottling(boolean secure, boolean randomDisableEnable) throws Exception {
+        ClientX509Util x509Util = null;
+        if (secure) {
+            x509Util = SSLAuthTest.setUpSecure();
+        }
+        try {
+            NettyServerCnxnFactory factory = (NettyServerCnxnFactory) serverFactory;
+            factory.setAdvancedFlowControlEnabled(true);
+            if (secure) {
+                factory.setSecure(true);
+            }
+
+            final String path = "/testEnableDisableThrottling";
+            try (ZooKeeper zk = createClient()) {
+                zk.create(path, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+                // meanwhile start another thread to enable and disable recv
+                AtomicBoolean stopped = new AtomicBoolean(false);
+                Random random = new Random();
+
+                Thread enableDisableThread = null;
+                if (randomDisableEnable) {
+                    enableDisableThread = new Thread() {
+                        @Override
+                        public void run() {
+                            while (!stopped.get()) {
+                                for (final ServerCnxn cnxn : serverFactory.cnxns) {
+                                    boolean shouldDisableEnable = random.nextBoolean();
+                                    if (shouldDisableEnable) {
+                                        cnxn.disableRecv();
+                                    } else {
+                                        cnxn.enableRecv();
+                                    }
+                                }
+                                try {
+                                    Thread.sleep(10);
+                                } catch (InterruptedException e) { /* ignore */ }
+                            }
+                            // always enable the recv at end
+                            for (final ServerCnxn cnxn : serverFactory.cnxns) {
+                                cnxn.enableRecv();
+                            }
+                        }
+                    };
+                } else {
+                    enableDisableThread = new Thread() {
+                        @Override
+                        public void run() {
+                            while (!stopped.get()) {
+                                for (final ServerCnxn cnxn : serverFactory.cnxns) {
+                                    try {
+                                        cnxn.disableRecv();
+                                        Thread.sleep(10);
+                                        cnxn.enableRecv();
+                                        Thread.sleep(10);
+                                    } catch (InterruptedException e) { /* ignore */ }
+                                }
+                            }
+                        }
+                    };
+                }
+                enableDisableThread.start();
+                LOG.info("started thread to enable and disable recv");
+
+                // start a thread to keep sending requests
+                int totalRequestsNum = 100000;
+                AtomicInteger successResponse = new AtomicInteger();
+                CountDownLatch responseReceivedLatch = new CountDownLatch(totalRequestsNum);
+                Thread clientThread = new Thread() {
+                    @Override
+                    public void run() {
+                        int requestIssued = 0;
+                        while (requestIssued++ < totalRequestsNum) {
+                            zk.getData(path, null, new DataCallback() {
+                                @Override
+                                public void processResult(int rc, String path,
+                                        Object ctx, byte data[], Stat stat) {
+                                    if (rc == 0) {
+                                        successResponse.addAndGet(1);
+                                    } else {
+                                        LOG.info("failed response is {}", rc);
+                                    }
+                                    responseReceivedLatch.countDown();
+                                }
+                            }, null);
+                        }
+                    }
+                };
+                clientThread.start();
+                LOG.info("started thread to issue {} async requests", totalRequestsNum);
+
+                // and verify the response received is same as what we issued
+                Assert.assertTrue(responseReceivedLatch.await(60, TimeUnit.SECONDS));
+                LOG.info("received all {} responses", totalRequestsNum);
+
+                stopped.set(true);
+                enableDisableThread.join();
+                LOG.info("enable and disable recv thread exited");
+
+                // wait another second for the left requests to finish
+                LOG.info("waiting another 1s for the requests to go through");
+                Thread.sleep(1000);
+                Assert.assertEquals(successResponse.get(), totalRequestsNum);
+            }
+        } finally {
+            if (secure) {
+                SSLAuthTest.clearSecureSetting(x509Util);
+            }
+        }
+    }
 }

+ 30 - 21
zookeeper-server/src/test/java/org/apache/zookeeper/test/SSLAuthTest.java

@@ -33,34 +33,24 @@ import org.junit.Test;
 public class SSLAuthTest extends ClientBase {
     
     private ClientX509Util clientX509Util;
-    
-    @Before
-    public void setUp() throws Exception {
-        clientX509Util = new ClientX509Util();
+
+    public static ClientX509Util setUpSecure() throws Exception{
+        ClientX509Util x509Util = new ClientX509Util();
         String testDataPath = System.getProperty("test.data.dir", "src/test/resources/data");
         System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory");
         System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
         System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
-        System.setProperty(clientX509Util.getSslAuthProviderProperty(), "x509");
-        System.setProperty(clientX509Util.getSslKeystoreLocationProperty(), testDataPath + "/ssl/testKeyStore.jks");
-        System.setProperty(clientX509Util.getSslKeystorePasswdProperty(), "testpass");
-        System.setProperty(clientX509Util.getSslTruststoreLocationProperty(), testDataPath + "/ssl/testTrustStore.jks");
-        System.setProperty(clientX509Util.getSslTruststorePasswdProperty(), "testpass");
+        System.setProperty(x509Util.getSslAuthProviderProperty(), "x509");
+        System.setProperty(x509Util.getSslKeystoreLocationProperty(), testDataPath + "/ssl/testKeyStore.jks");
+        System.setProperty(x509Util.getSslKeystorePasswdProperty(), "testpass");
+        System.setProperty(x509Util.getSslTruststoreLocationProperty(), testDataPath + "/ssl/testTrustStore.jks");
+        System.setProperty(x509Util.getSslTruststorePasswdProperty(), "testpass");
         System.setProperty("javax.net.debug", "ssl");
         System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
-
-        String host = "localhost";
-        int port = PortAssignment.unique();
-        hostPort = host + ":" + port;
-
-        serverFactory = ServerCnxnFactory.createFactory();
-        serverFactory.configure(new InetSocketAddress(host, port), maxCnxns, -1, true);
-
-        super.setUp();
+        return x509Util;
     }
 
-    @After
-    public void teardown() throws Exception {
+    public static void clearSecureSetting(ClientX509Util clientX509Util) {
         System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
         System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
         System.clearProperty(ZKClientConfig.SECURE_CLIENT);
@@ -73,6 +63,25 @@ public class SSLAuthTest extends ClientBase {
         System.clearProperty("zookeeper.authProvider.x509");
         clientX509Util.close();
     }
+    
+    @Before
+    public void setUp() throws Exception {
+        clientX509Util = setUpSecure();
+
+        String host = "localhost";
+        int port = PortAssignment.unique();
+        hostPort = host + ":" + port;
+
+        serverFactory = ServerCnxnFactory.createFactory();
+        serverFactory.configure(new InetSocketAddress(host, port), maxCnxns, -1, true);
+
+        super.setUp();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        clearSecureSetting(clientX509Util);
+    }
 
     @Test
     public void testRejection() throws Exception {
@@ -103,4 +112,4 @@ public class SSLAuthTest extends ClientBase {
         Assert.assertFalse("Missing SSL configuration should not result in successful connection",
                 watcher.clientConnected.await(1000, TimeUnit.MILLISECONDS));
     }
-}
+}