|
@@ -47,6 +47,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
|
|
|
import org.jboss.netty.buffer.ChannelBuffers;
|
|
|
import org.jboss.netty.channel.Channel;
|
|
|
import org.jboss.netty.channel.ChannelFuture;
|
|
|
+import org.jboss.netty.channel.ChannelFutureListener;
|
|
|
import org.jboss.netty.channel.MessageEvent;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -62,6 +63,7 @@ public class NettyServerCnxn extends ServerCnxn {
|
|
|
int sessionTimeout;
|
|
|
AtomicLong outstandingCount = new AtomicLong();
|
|
|
Certificate[] clientChain;
|
|
|
+ volatile boolean closingChannel;
|
|
|
|
|
|
/** The ZooKeeperServer for this connection. May be null if the server
|
|
|
* is not currently serving requests (for example if the server is not
|
|
@@ -74,6 +76,7 @@ public class NettyServerCnxn extends ServerCnxn {
|
|
|
|
|
|
NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
|
|
|
this.channel = channel;
|
|
|
+ this.closingChannel = false;
|
|
|
this.zkServer = zks;
|
|
|
this.factory = factory;
|
|
|
if (this.factory.login != null) {
|
|
@@ -83,6 +86,8 @@ public class NettyServerCnxn extends ServerCnxn {
|
|
|
|
|
|
@Override
|
|
|
public void close() {
|
|
|
+ closingChannel = true;
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("close called for sessionid:0x"
|
|
|
+ Long.toHexString(sessionId));
|
|
@@ -115,7 +120,10 @@ public class NettyServerCnxn extends ServerCnxn {
|
|
|
}
|
|
|
|
|
|
if (channel.isOpen()) {
|
|
|
- channel.close();
|
|
|
+ // Since we don't check on the futures created by write calls to the channel complete we need to make sure
|
|
|
+ // that all writes have been completed before closing the channel or we risk data loss
|
|
|
+ // See: http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html
|
|
|
+ channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -171,7 +179,7 @@ public class NettyServerCnxn extends ServerCnxn {
|
|
|
@Override
|
|
|
public void sendResponse(ReplyHeader h, Record r, String tag)
|
|
|
throws IOException {
|
|
|
- if (!channel.isOpen()) {
|
|
|
+ if (closingChannel || !channel.isOpen()) {
|
|
|
return;
|
|
|
}
|
|
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|