瀏覽代碼

ZOOKEEPER-3131: Remove watcher when session closed in NettyServerCnxn

Currently, it doesn't remove itself from ZK server when the cnxn is closed, which
will leak watchers, close it to make it align with NIO implementation.

Author: Fangmin Lyu <allenlyu@fb.com>

Reviewers: hanm, anmolnar, nkalmar

Closes #612 from lvfangmin/ZOOKEEPER-3131
Fangmin Lyu 6 年之前
父節點
當前提交
95557a30ed

+ 11 - 7
src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -73,7 +73,7 @@ public class NettyServerCnxn extends ServerCnxn {
 
 
     NettyServerCnxnFactory factory;
     NettyServerCnxnFactory factory;
     boolean initialized;
     boolean initialized;
-    
+
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
         this.channel = channel;
         this.channel = channel;
         this.closingChannel = false;
         this.closingChannel = false;
@@ -83,11 +83,11 @@ public class NettyServerCnxn extends ServerCnxn {
             this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
             this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
         }
         }
     }
     }
-    
+
     @Override
     @Override
     public void close() {
     public void close() {
         closingChannel = true;
         closingChannel = true;
-        
+
         if (LOG.isDebugEnabled()) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("close called for sessionid:0x"
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
                     + Long.toHexString(sessionId));
@@ -119,6 +119,10 @@ public class NettyServerCnxn extends ServerCnxn {
             }
             }
         }
         }
 
 
+        if (zkServer != null) {
+            zkServer.removeCnxn(this);
+        }
+
         if (channel.isOpen()) {
         if (channel.isOpen()) {
             // Since we don't check on the futures created by write calls to the channel complete we need to make sure
             // Since we don't check on the futures created by write calls to the channel complete we need to make sure
             // that all writes have been completed before closing the channel or we risk data loss
             // that all writes have been completed before closing the channel or we risk data loss
@@ -174,7 +178,7 @@ public class NettyServerCnxn extends ServerCnxn {
         @Override
         @Override
         public ChannelFuture getFuture() {return null;}
         public ChannelFuture getFuture() {return null;}
     };
     };
-    
+
     @Override
     @Override
     public void sendResponse(ReplyHeader h, Record r, String tag)
     public void sendResponse(ReplyHeader h, Record r, String tag)
             throws IOException {
             throws IOException {
@@ -226,7 +230,7 @@ public class NettyServerCnxn extends ServerCnxn {
      */
      */
     private class SendBufferWriter extends Writer {
     private class SendBufferWriter extends Writer {
         private StringBuffer sb = new StringBuffer();
         private StringBuffer sb = new StringBuffer();
-        
+
         /**
         /**
          * Check if we are ready to send another chunk.
          * Check if we are ready to send another chunk.
          * @param force force sending, even if not a full chunk
          * @param force force sending, even if not a full chunk
@@ -415,7 +419,7 @@ public class NettyServerCnxn extends ServerCnxn {
     public void disableRecv() {
     public void disableRecv() {
         disableRecvNoWait().awaitUninterruptibly();
         disableRecvNoWait().awaitUninterruptibly();
     }
     }
-    
+
     private ChannelFuture disableRecvNoWait() {
     private ChannelFuture disableRecvNoWait() {
         throttled = true;
         throttled = true;
         if (LOG.isDebugEnabled()) {
         if (LOG.isDebugEnabled()) {
@@ -423,7 +427,7 @@ public class NettyServerCnxn extends ServerCnxn {
         }
         }
         return channel.setReadable(false);
         return channel.setReadable(false);
     }
     }
-    
+
     @Override
     @Override
     public long getOutstandingRequests() {
     public long getOutstandingRequests() {
         return outstandingCount.longValue();
         return outstandingCount.longValue();

+ 7 - 2
src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java

@@ -56,7 +56,7 @@ public class NettyServerCnxnTest extends ClientBase {
      * servercnxnfactory should remove all channel references to avoid
      * servercnxnfactory should remove all channel references to avoid
      * duplicate channel closure. Duplicate closure may result in indefinite
      * duplicate channel closure. Duplicate closure may result in indefinite
      * hanging due to netty open issue.
      * hanging due to netty open issue.
-     * 
+     *
      * @see <a href="https://issues.jboss.org/browse/NETTY-412">NETTY-412</a>
      * @see <a href="https://issues.jboss.org/browse/NETTY-412">NETTY-412</a>
      */
      */
     @Test(timeout = 40000)
     @Test(timeout = 40000)
@@ -66,13 +66,16 @@ public class NettyServerCnxnTest extends ClientBase {
                 serverFactory instanceof NettyServerCnxnFactory);
                 serverFactory instanceof NettyServerCnxnFactory);
 
 
         final ZooKeeper zk = createClient();
         final ZooKeeper zk = createClient();
+        final ZooKeeperServer zkServer = getServer(serverFactory);
         final String path = "/a";
         final String path = "/a";
         try {
         try {
             // make sure zkclient works
             // make sure zkclient works
             zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
             zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
                     CreateMode.PERSISTENT);
+            // set on watch
             Assert.assertNotNull("Didn't create znode:" + path,
             Assert.assertNotNull("Didn't create znode:" + path,
-                    zk.exists(path, false));
+                    zk.exists(path, true));
+            Assert.assertEquals(1, zkServer.getZKDatabase().getDataTree().getWatchCount());
             Iterable<ServerCnxn> connections = serverFactory.getConnections();
             Iterable<ServerCnxn> connections = serverFactory.getConnections();
             Assert.assertEquals("Mismatch in number of live connections!", 1,
             Assert.assertEquals("Mismatch in number of live connections!", 1,
                     serverFactory.getNumAliveConnections());
                     serverFactory.getNumAliveConnections());
@@ -88,6 +91,8 @@ public class NettyServerCnxnTest extends ClientBase {
                     Assert.fail("The number of live connections should be 0");
                     Assert.fail("The number of live connections should be 0");
                 }
                 }
             }
             }
+            // make sure the watch is removed when the connection closed
+            Assert.assertEquals(0, zkServer.getZKDatabase().getDataTree().getWatchCount());
         } finally {
         } finally {
             zk.close();
             zk.close();
         }
         }