Parcourir la source

ZOOKEEPER-2072 Netty Server Should Configure Child Channel Pipeline By Specifying ChannelPipelineFactory(Hongchao via rakeshr)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1655910 13f79535-47bb-0310-9956-ffa450edef68
Rakesh Radhakrishnan il y a 10 ans
Parent
commit
a95add82db

+ 3 - 0
CHANGES.txt

@@ -22,6 +22,9 @@ BUGFIXES:
   
   
   ZOOKEEPER-2060 Trace bug in NettyServerCnxnFactory (Ian via fpj)
   ZOOKEEPER-2060 Trace bug in NettyServerCnxnFactory (Ian via fpj)
 
 
+  ZOOKEEPER-2072 Netty Server Should Configure Child Channel Pipeline By Specifying 
+  ChannelPipelineFactory (Hongchao via rakeshr)
+
 IMPROVEMENTS:
 IMPROVEMENTS:
   ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex)  
   ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex)  
 
 

+ 12 - 2
src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java

@@ -35,7 +35,10 @@ import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandler.Sharable;
 import org.jboss.netty.channel.ChannelHandler.Sharable;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.SimpleChannelHandler;
@@ -242,7 +245,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     }
     }
     
     
     CnxnChannelHandler channelHandler = new CnxnChannelHandler();
     CnxnChannelHandler channelHandler = new CnxnChannelHandler();
-    
+
     NettyServerCnxnFactory() {
     NettyServerCnxnFactory() {
         bootstrap = new ServerBootstrap(
         bootstrap = new ServerBootstrap(
                 new NioServerSocketChannelFactory(
                 new NioServerSocketChannelFactory(
@@ -254,8 +257,15 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         bootstrap.setOption("child.tcpNoDelay", true);
         bootstrap.setOption("child.tcpNoDelay", true);
         /* set socket linger to off, so that socket close does not block */
         /* set socket linger to off, so that socket close does not block */
         bootstrap.setOption("child.soLinger", -1);
         bootstrap.setOption("child.soLinger", -1);
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
+            public ChannelPipeline getPipeline() throws Exception {
+                ChannelPipeline p = Channels.pipeline();
+                p.addLast("servercnxnfactory", channelHandler);
 
 
-        bootstrap.getPipeline().addLast("servercnxnfactory", channelHandler);
+                return p;
+            }
+        });
     }
     }
     
     
     @Override
     @Override

+ 8 - 24
src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java

@@ -18,18 +18,12 @@
 
 
 package org.apache.zookeeper.server;
 package org.apache.zookeeper.server;
 
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import junit.framework.Assert;
 import junit.framework.Assert;
 
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
 import org.junit.Test;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -63,21 +57,6 @@ public class NettyServerCnxnTest extends ClientBase {
                 "Didn't instantiate ServerCnxnFactory with NettyServerCnxnFactory!",
                 "Didn't instantiate ServerCnxnFactory with NettyServerCnxnFactory!",
                 serverFactory instanceof NettyServerCnxnFactory);
                 serverFactory instanceof NettyServerCnxnFactory);
 
 
-        NettyServerCnxnFactory nettyServerFactory = (NettyServerCnxnFactory) serverFactory;
-        final CountDownLatch channelLatch = new CountDownLatch(1);
-        CnxnChannelHandler channelHandler = nettyServerFactory.new CnxnChannelHandler() {
-            @Override
-            public void channelDisconnected(ChannelHandlerContext ctx,
-                    ChannelStateEvent e) throws Exception {
-                LOG.info("Recieves channel disconnected event");
-                channelLatch.countDown();
-            }
-        };
-        LOG.info("Adding custom channel handler for simulation");
-        nettyServerFactory.bootstrap.getPipeline().remove("servercnxnfactory");
-        nettyServerFactory.bootstrap.getPipeline().addLast("servercnxnfactory",
-                channelHandler);
-
         final ZooKeeper zk = createClient();
         final ZooKeeper zk = createClient();
         final String path = "/a";
         final String path = "/a";
         try {
         try {
@@ -93,9 +72,14 @@ public class NettyServerCnxnTest extends ClientBase {
                 serverCnxn.sendCloseSession();
                 serverCnxn.sendCloseSession();
             }
             }
             LOG.info("Waiting for the channel disconnected event");
             LOG.info("Waiting for the channel disconnected event");
-            channelLatch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-            Assert.assertEquals("Mismatch in number of live connections!", 0,
-                    serverFactory.getNumAliveConnections());
+            int timeout = 0;
+            while (serverFactory.getNumAliveConnections() != 0) {
+                Thread.sleep(1000);
+                timeout += 1000;
+                if (timeout > CONNECTION_TIMEOUT) {
+                    Assert.fail("The number of live connections should be 0");
+                }
+            }
         } finally {
         } finally {
             zk.close();
             zk.close();
         }
         }