|
@@ -18,41 +18,6 @@
|
|
|
|
|
|
package org.apache.zookeeper.server;
|
|
|
|
|
|
-import org.apache.zookeeper.KeeperException;
|
|
|
-import org.apache.zookeeper.common.ClientX509Util;
|
|
|
-import org.apache.zookeeper.common.X509Exception;
|
|
|
-import org.apache.zookeeper.common.X509Exception.SSLContextException;
|
|
|
-import org.apache.zookeeper.server.auth.ProviderRegistry;
|
|
|
-import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
|
|
|
-import org.jboss.netty.bootstrap.ServerBootstrap;
|
|
|
-import org.jboss.netty.buffer.ChannelBuffer;
|
|
|
-import org.jboss.netty.buffer.ChannelBuffers;
|
|
|
-import org.jboss.netty.channel.Channel;
|
|
|
-import org.jboss.netty.channel.ChannelFuture;
|
|
|
-import org.jboss.netty.channel.ChannelFutureListener;
|
|
|
-import org.jboss.netty.channel.ChannelHandler.Sharable;
|
|
|
-import org.jboss.netty.channel.ChannelHandlerContext;
|
|
|
-import org.jboss.netty.channel.ChannelPipeline;
|
|
|
-import org.jboss.netty.channel.ChannelPipelineFactory;
|
|
|
-import org.jboss.netty.channel.ChannelStateEvent;
|
|
|
-import org.jboss.netty.channel.Channels;
|
|
|
-import org.jboss.netty.channel.ExceptionEvent;
|
|
|
-import org.jboss.netty.channel.MessageEvent;
|
|
|
-import org.jboss.netty.channel.SimpleChannelHandler;
|
|
|
-import org.jboss.netty.channel.WriteCompletionEvent;
|
|
|
-import org.jboss.netty.channel.group.ChannelGroup;
|
|
|
-import org.jboss.netty.channel.group.DefaultChannelGroup;
|
|
|
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
|
|
-import org.jboss.netty.handler.ssl.SslHandler;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import javax.net.ssl.SSLContext;
|
|
|
-import javax.net.ssl.SSLEngine;
|
|
|
-import javax.net.ssl.SSLPeerUnverifiedException;
|
|
|
-import javax.net.ssl.SSLSession;
|
|
|
-import javax.net.ssl.X509KeyManager;
|
|
|
-import javax.net.ssl.X509TrustManager;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
@@ -61,51 +26,86 @@ import java.security.NoSuchAlgorithmException;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.Set;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
+
|
|
|
+import javax.net.ssl.SSLContext;
|
|
|
+import javax.net.ssl.SSLEngine;
|
|
|
+import javax.net.ssl.SSLPeerUnverifiedException;
|
|
|
+import javax.net.ssl.SSLSession;
|
|
|
+import javax.net.ssl.X509KeyManager;
|
|
|
+import javax.net.ssl.X509TrustManager;
|
|
|
|
|
|
-import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
|
|
|
+import io.netty.bootstrap.ServerBootstrap;
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
|
+import io.netty.buffer.ByteBufAllocator;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelDuplexHandler;
|
|
|
+import io.netty.channel.ChannelFuture;
|
|
|
+import io.netty.channel.ChannelHandler.Sharable;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
+import io.netty.channel.ChannelInitializer;
|
|
|
+import io.netty.channel.ChannelOption;
|
|
|
+import io.netty.channel.ChannelPipeline;
|
|
|
+import io.netty.channel.ChannelPromise;
|
|
|
+import io.netty.channel.EventLoopGroup;
|
|
|
+import io.netty.channel.group.ChannelGroup;
|
|
|
+import io.netty.channel.group.ChannelGroupFuture;
|
|
|
+import io.netty.channel.group.DefaultChannelGroup;
|
|
|
+import io.netty.channel.socket.SocketChannel;
|
|
|
+import io.netty.handler.ssl.SslHandler;
|
|
|
+import io.netty.util.AttributeKey;
|
|
|
+import io.netty.util.ReferenceCountUtil;
|
|
|
+import io.netty.util.concurrent.DefaultEventExecutor;
|
|
|
+import io.netty.util.concurrent.Future;
|
|
|
+import io.netty.util.concurrent.GenericFutureListener;
|
|
|
+import org.apache.zookeeper.KeeperException;
|
|
|
+import org.apache.zookeeper.common.ClientX509Util;
|
|
|
+import org.apache.zookeeper.common.NettyUtils;
|
|
|
+import org.apache.zookeeper.common.X509Exception;
|
|
|
+import org.apache.zookeeper.common.X509Exception.SSLContextException;
|
|
|
+import org.apache.zookeeper.server.auth.ProviderRegistry;
|
|
|
+import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
|
|
|
|
|
|
- ServerBootstrap bootstrap;
|
|
|
- Channel parentChannel;
|
|
|
- ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
|
|
|
- Map<InetAddress, Set<NettyServerCnxn>> ipMap =
|
|
|
- new HashMap<InetAddress, Set<NettyServerCnxn>>( );
|
|
|
- InetSocketAddress localAddress;
|
|
|
- int maxClientCnxns = 60;
|
|
|
- ClientX509Util x509Util;
|
|
|
+ private final ServerBootstrap bootstrap;
|
|
|
+ private Channel parentChannel;
|
|
|
+ private final ChannelGroup allChannels =
|
|
|
+ new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor());
|
|
|
+ // Access to ipMap or to any Set contained in the map needs to be
|
|
|
+ // protected with synchronized (ipMap) { ... }
|
|
|
+ private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>();
|
|
|
+ private InetSocketAddress localAddress;
|
|
|
+ private int maxClientCnxns = 60;
|
|
|
+ private final ClientX509Util x509Util;
|
|
|
+
|
|
|
+ private static final AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE =
|
|
|
+ AttributeKey.valueOf("NettyServerCnxn");
|
|
|
+
|
|
|
+ private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
|
|
|
+ new AtomicReference<>(null);
|
|
|
|
|
|
/**
|
|
|
- * This is an inner class since we need to extend SimpleChannelHandler, but
|
|
|
+ * This is an inner class since we need to extend ChannelDuplexHandler, but
|
|
|
* NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
|
|
|
* this class gets access to the member variables and methods.
|
|
|
*/
|
|
|
@Sharable
|
|
|
- class CnxnChannelHandler extends SimpleChannelHandler {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
|
|
|
- throws Exception
|
|
|
- {
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Channel closed " + e);
|
|
|
- }
|
|
|
- allChannels.remove(ctx.getChannel());
|
|
|
- }
|
|
|
+ class CnxnChannelHandler extends ChannelDuplexHandler {
|
|
|
|
|
|
@Override
|
|
|
- public void channelConnected(ChannelHandlerContext ctx,
|
|
|
- ChannelStateEvent e) throws Exception
|
|
|
- {
|
|
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Channel connected " + e);
|
|
|
+ LOG.trace("Channel active {}", ctx.channel());
|
|
|
}
|
|
|
|
|
|
- Channel channel = ctx.getChannel();
|
|
|
- InetAddress addr = ((InetSocketAddress) channel.getRemoteAddress())
|
|
|
+ final Channel channel = ctx.channel();
|
|
|
+ InetAddress addr = ((InetSocketAddress) channel.remoteAddress())
|
|
|
.getAddress();
|
|
|
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
|
|
|
LOG.warn("Too many connections from {} - max is {}", addr,
|
|
@@ -116,170 +116,104 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
|
|
|
NettyServerCnxn cnxn = new NettyServerCnxn(channel,
|
|
|
zkServer, NettyServerCnxnFactory.this);
|
|
|
- ctx.setAttachment(cnxn);
|
|
|
+ ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
|
|
|
|
|
|
if (secure) {
|
|
|
- SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
|
|
|
- ChannelFuture handshakeFuture = sslHandler.handshake();
|
|
|
+ SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
|
|
|
+ Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
|
|
|
handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
|
|
|
} else {
|
|
|
- allChannels.add(ctx.getChannel());
|
|
|
+ allChannels.add(ctx.channel());
|
|
|
addCnxn(cnxn);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void channelDisconnected(ChannelHandlerContext ctx,
|
|
|
- ChannelStateEvent e) throws Exception
|
|
|
- {
|
|
|
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Channel disconnected " + e);
|
|
|
+ LOG.trace("Channel inactive {}", ctx.channel());
|
|
|
}
|
|
|
- NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
|
|
|
+ allChannels.remove(ctx.channel());
|
|
|
+ NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
|
|
|
if (cnxn != null) {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Channel disconnect caused close " + e);
|
|
|
+ LOG.trace("Channel inactive caused close {}", cnxn);
|
|
|
}
|
|
|
cnxn.close();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
|
|
- throws Exception
|
|
|
- {
|
|
|
- LOG.warn("Exception caught " + e, e.getCause());
|
|
|
- NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
+ LOG.warn("Exception caught", cause);
|
|
|
+ NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
|
|
|
if (cnxn != null) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Closing " + cnxn);
|
|
|
+ LOG.debug("Closing {}", cnxn);
|
|
|
}
|
|
|
cnxn.close();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
|
|
- throws Exception
|
|
|
- {
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("message received called " + e.getMessage());
|
|
|
- }
|
|
|
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
|
|
try {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("New message " + e.toString()
|
|
|
- + " from " + ctx.getChannel());
|
|
|
- }
|
|
|
- NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
|
|
|
- synchronized(cnxn) {
|
|
|
- processMessage(e, cnxn);
|
|
|
+ if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
|
|
|
+ LOG.debug("Received AutoReadEvent.ENABLE");
|
|
|
+ NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
|
|
|
+ // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
|
|
|
+ // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
|
|
|
+ // after either of those. Check for null just to be safe ...
|
|
|
+ if (cnxn != null) {
|
|
|
+ cnxn.processQueuedBuffer();
|
|
|
+ }
|
|
|
+ ctx.channel().config().setAutoRead(true);
|
|
|
+ } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
|
|
|
+ LOG.debug("Received AutoReadEvent.DISABLE");
|
|
|
+ ctx.channel().config().setAutoRead(false);
|
|
|
}
|
|
|
- } catch(Exception ex) {
|
|
|
- LOG.error("Unexpected exception in receive", ex);
|
|
|
- throw ex;
|
|
|
+ } finally {
|
|
|
+ ReferenceCountUtil.release(evt);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
|
|
|
- + cnxn.queuedBuffer);
|
|
|
- }
|
|
|
-
|
|
|
- if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
|
|
|
- LOG.debug("Received ResumeMessageEvent");
|
|
|
- if (cnxn.queuedBuffer != null) {
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("processing queue "
|
|
|
- + Long.toHexString(cnxn.sessionId)
|
|
|
- + " queuedBuffer 0x"
|
|
|
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
|
|
|
- }
|
|
|
- cnxn.receiveMessage(cnxn.queuedBuffer);
|
|
|
- if (!cnxn.queuedBuffer.readable()) {
|
|
|
- LOG.debug("Processed queue - no bytes remaining");
|
|
|
- cnxn.queuedBuffer = null;
|
|
|
- } else {
|
|
|
- LOG.debug("Processed queue - bytes remaining");
|
|
|
- }
|
|
|
- } else {
|
|
|
- LOG.debug("queue empty");
|
|
|
- }
|
|
|
- cnxn.channel.setReadable(true);
|
|
|
- } else {
|
|
|
- ChannelBuffer buf = (ChannelBuffer)e.getMessage();
|
|
|
+ @Override
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
|
+ try {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(Long.toHexString(cnxn.sessionId)
|
|
|
- + " buf 0x"
|
|
|
- + ChannelBuffers.hexDump(buf));
|
|
|
+ LOG.trace("message received called {}", msg);
|
|
|
}
|
|
|
-
|
|
|
- if (cnxn.throttled) {
|
|
|
- LOG.debug("Received message while throttled");
|
|
|
- // we are throttled, so we need to queue
|
|
|
- if (cnxn.queuedBuffer == null) {
|
|
|
- LOG.debug("allocating queue");
|
|
|
- cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
|
|
|
- }
|
|
|
- cnxn.queuedBuffer.writeBytes(buf);
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(Long.toHexString(cnxn.sessionId)
|
|
|
- + " queuedBuffer 0x"
|
|
|
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
|
|
|
+ try {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("New message {} from {}", msg, ctx.channel());
|
|
|
}
|
|
|
- } else {
|
|
|
- LOG.debug("not throttled");
|
|
|
- if (cnxn.queuedBuffer != null) {
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(Long.toHexString(cnxn.sessionId)
|
|
|
- + " queuedBuffer 0x"
|
|
|
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
|
|
|
- }
|
|
|
- cnxn.queuedBuffer.writeBytes(buf);
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(Long.toHexString(cnxn.sessionId)
|
|
|
- + " queuedBuffer 0x"
|
|
|
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
|
|
|
- }
|
|
|
-
|
|
|
- cnxn.receiveMessage(cnxn.queuedBuffer);
|
|
|
- if (!cnxn.queuedBuffer.readable()) {
|
|
|
- LOG.debug("Processed queue - no bytes remaining");
|
|
|
- cnxn.queuedBuffer = null;
|
|
|
- } else {
|
|
|
- LOG.debug("Processed queue - bytes remaining");
|
|
|
- }
|
|
|
+ NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
|
|
|
+ if (cnxn == null) {
|
|
|
+ LOG.error("channelRead() on a closed or closing NettyServerCnxn");
|
|
|
} else {
|
|
|
- cnxn.receiveMessage(buf);
|
|
|
- if (buf.readable()) {
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Before copy " + buf);
|
|
|
- }
|
|
|
- cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
|
|
|
- cnxn.queuedBuffer.writeBytes(buf);
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Copy is " + cnxn.queuedBuffer);
|
|
|
- LOG.trace(Long.toHexString(cnxn.sessionId)
|
|
|
- + " queuedBuffer 0x"
|
|
|
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
|
|
|
- }
|
|
|
- }
|
|
|
+ cnxn.processMessage((ByteBuf) msg);
|
|
|
}
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.error("Unexpected exception in receive", ex);
|
|
|
+ throw ex;
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ ReferenceCountUtil.release(msg);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void writeComplete(ChannelHandlerContext ctx,
|
|
|
- WriteCompletionEvent e) throws Exception
|
|
|
- {
|
|
|
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("write complete " + e);
|
|
|
+ promise.addListener((future) -> {
|
|
|
+ LOG.trace("write {}",
|
|
|
+ future.isSuccess() ? "complete" : "failed");
|
|
|
+ });
|
|
|
}
|
|
|
+ super.write(ctx, msg, promise);
|
|
|
}
|
|
|
|
|
|
- private final class CertificateVerifier
|
|
|
- implements ChannelFutureListener {
|
|
|
+ private final class CertificateVerifier implements GenericFutureListener<Future<Channel>> {
|
|
|
private final SslHandler sslHandler;
|
|
|
private final NettyServerCnxn cnxn;
|
|
|
|
|
@@ -291,12 +225,13 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
/**
|
|
|
* Only allow the connection to stay open if certificate passes auth
|
|
|
*/
|
|
|
- public void operationComplete(ChannelFuture future)
|
|
|
- throws SSLPeerUnverifiedException {
|
|
|
+ public void operationComplete(Future<Channel> future) throws SSLPeerUnverifiedException {
|
|
|
if (future.isSuccess()) {
|
|
|
- LOG.debug("Successful handshake with session 0x{}",
|
|
|
- Long.toHexString(cnxn.sessionId));
|
|
|
- SSLEngine eng = sslHandler.getEngine();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Successful handshake with session 0x{}",
|
|
|
+ Long.toHexString(cnxn.getSessionId()));
|
|
|
+ }
|
|
|
+ SSLEngine eng = sslHandler.engine();
|
|
|
SSLSession session = eng.getSession();
|
|
|
cnxn.setClientCertificateChain(session.getPeerCertificates());
|
|
|
|
|
@@ -316,16 +251,17 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
if (KeeperException.Code.OK !=
|
|
|
authProvider.handleAuthentication(cnxn, null)) {
|
|
|
LOG.error("Authentication failed for session 0x{}",
|
|
|
- Long.toHexString(cnxn.sessionId));
|
|
|
+ Long.toHexString(cnxn.getSessionId()));
|
|
|
cnxn.close();
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- allChannels.add(future.getChannel());
|
|
|
+ final Channel futureChannel = future.getNow();
|
|
|
+ allChannels.add(Objects.requireNonNull(futureChannel));
|
|
|
addCnxn(cnxn);
|
|
|
} else {
|
|
|
LOG.error("Unsuccessful handshake with session 0x{}",
|
|
|
- Long.toHexString(cnxn.sessionId));
|
|
|
+ Long.toHexString(cnxn.getSessionId()));
|
|
|
cnxn.close();
|
|
|
}
|
|
|
}
|
|
@@ -334,30 +270,42 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
|
|
|
CnxnChannelHandler channelHandler = new CnxnChannelHandler();
|
|
|
|
|
|
- NettyServerCnxnFactory() {
|
|
|
- bootstrap = new ServerBootstrap(
|
|
|
- new NioServerSocketChannelFactory(
|
|
|
- Executors.newCachedThreadPool(),
|
|
|
- Executors.newCachedThreadPool()));
|
|
|
- // parent channel
|
|
|
- bootstrap.setOption("reuseAddress", true);
|
|
|
- // child channels
|
|
|
- bootstrap.setOption("child.tcpNoDelay", true);
|
|
|
- /* set socket linger to off, so that socket close does not block */
|
|
|
- bootstrap.setOption("child.soLinger", -1);
|
|
|
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
|
|
|
- @Override
|
|
|
- public ChannelPipeline getPipeline() throws Exception {
|
|
|
- ChannelPipeline p = Channels.pipeline();
|
|
|
- if (secure) {
|
|
|
- initSSL(p);
|
|
|
- }
|
|
|
- p.addLast("servercnxnfactory", channelHandler);
|
|
|
+ private ServerBootstrap configureBootstrapAllocator(ServerBootstrap bootstrap) {
|
|
|
+ ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
|
|
|
+ if (testAllocator != null) {
|
|
|
+ return bootstrap
|
|
|
+ .option(ChannelOption.ALLOCATOR, testAllocator)
|
|
|
+ .childOption(ChannelOption.ALLOCATOR, testAllocator);
|
|
|
+ } else {
|
|
|
+ return bootstrap;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- return p;
|
|
|
- }
|
|
|
- });
|
|
|
+ NettyServerCnxnFactory() {
|
|
|
x509Util = new ClientX509Util();
|
|
|
+
|
|
|
+ EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup();
|
|
|
+ EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
|
|
|
+ ServerBootstrap bootstrap = new ServerBootstrap()
|
|
|
+ .group(bossGroup, workerGroup)
|
|
|
+ .channel(NettyUtils.nioOrEpollServerSocketChannel())
|
|
|
+ // parent channel options
|
|
|
+ .option(ChannelOption.SO_REUSEADDR, true)
|
|
|
+ // child channels options
|
|
|
+ .childOption(ChannelOption.TCP_NODELAY, true)
|
|
|
+ .childOption(ChannelOption.SO_LINGER, -1)
|
|
|
+ .childHandler(new ChannelInitializer<SocketChannel>() {
|
|
|
+ @Override
|
|
|
+ protected void initChannel(SocketChannel ch) throws Exception {
|
|
|
+ ChannelPipeline pipeline = ch.pipeline();
|
|
|
+ if (secure) {
|
|
|
+ initSSL(pipeline);
|
|
|
+ }
|
|
|
+ pipeline.addLast("servercnxnfactory", channelHandler);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ this.bootstrap = configureBootstrapAllocator(bootstrap);
|
|
|
+ this.bootstrap.validate();
|
|
|
}
|
|
|
|
|
|
private synchronized void initSSL(ChannelPipeline p)
|
|
@@ -390,7 +338,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
sslEngine.setNeedClientAuth(true);
|
|
|
|
|
|
p.addLast("ssl", new SslHandler(sslEngine));
|
|
|
- LOG.info("SSL handler added for channel: {}", p.getChannel());
|
|
|
+ LOG.info("SSL handler added for channel: {}", p.channel());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -440,7 +388,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
return localAddress.getPort();
|
|
|
}
|
|
|
|
|
|
- boolean killed;
|
|
|
+ private boolean killed; // use synchronized(this) to access
|
|
|
@Override
|
|
|
public void join() throws InterruptedException {
|
|
|
synchronized(this) {
|
|
@@ -452,16 +400,42 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
|
|
|
@Override
|
|
|
public void shutdown() {
|
|
|
- LOG.info("shutdown called " + localAddress);
|
|
|
+ synchronized (this) {
|
|
|
+ if (killed) {
|
|
|
+ LOG.info("already shutdown {}", localAddress);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("shutdown called {}", localAddress);
|
|
|
+
|
|
|
if (login != null) {
|
|
|
login.shutdown();
|
|
|
}
|
|
|
+
|
|
|
+ final EventLoopGroup bossGroup = bootstrap.config().group();
|
|
|
+ final EventLoopGroup workerGroup = bootstrap.config().childGroup();
|
|
|
// null if factory never started
|
|
|
if (parentChannel != null) {
|
|
|
- parentChannel.close().awaitUninterruptibly();
|
|
|
+ ChannelFuture parentCloseFuture = parentChannel.close();
|
|
|
+ if (bossGroup != null) {
|
|
|
+ parentCloseFuture.addListener(future -> {
|
|
|
+ bossGroup.shutdownGracefully();
|
|
|
+ });
|
|
|
+ }
|
|
|
closeAll();
|
|
|
- allChannels.close().awaitUninterruptibly();
|
|
|
- bootstrap.releaseExternalResources();
|
|
|
+ ChannelGroupFuture allChannelsCloseFuture = allChannels.close();
|
|
|
+ if (workerGroup != null) {
|
|
|
+ allChannelsCloseFuture.addListener(future -> {
|
|
|
+ workerGroup.shutdownGracefully();
|
|
|
+ });
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (bossGroup != null) {
|
|
|
+ bossGroup.shutdownGracefully();
|
|
|
+ }
|
|
|
+ if (workerGroup != null) {
|
|
|
+ workerGroup.shutdownGracefully();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (zkServer != null) {
|
|
@@ -475,16 +449,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
|
|
|
@Override
|
|
|
public void start() {
|
|
|
- LOG.info("binding to port " + localAddress);
|
|
|
- parentChannel = bootstrap.bind(localAddress);
|
|
|
+ LOG.info("binding to port {}", localAddress);
|
|
|
+ parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
|
|
|
+ // Port changes after bind() if the original port was 0, update
|
|
|
+ // localAddress to get the real port.
|
|
|
+ localAddress = (InetSocketAddress) parentChannel.localAddress();
|
|
|
+ LOG.info("bound to port " + getLocalPort());
|
|
|
}
|
|
|
|
|
|
public void reconfigure(InetSocketAddress addr) {
|
|
|
Channel oldChannel = parentChannel;
|
|
|
try {
|
|
|
LOG.info("binding to port {}", addr);
|
|
|
- parentChannel = bootstrap.bind(addr);
|
|
|
- localAddress = addr;
|
|
|
+ parentChannel = bootstrap.bind(addr).syncUninterruptibly().channel();
|
|
|
+ // Port changes after bind() if the original port was 0, update
|
|
|
+ // localAddress to get the real port.
|
|
|
+ localAddress = (InetSocketAddress) parentChannel.localAddress();
|
|
|
+ LOG.info("bound to port " + getLocalPort());
|
|
|
} catch (Exception e) {
|
|
|
LOG.error("Error while reconfiguring", e);
|
|
|
} finally {
|
|
@@ -517,21 +498,39 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
cnxns.add(cnxn);
|
|
|
synchronized (ipMap){
|
|
|
InetAddress addr =
|
|
|
- ((InetSocketAddress)cnxn.channel.getRemoteAddress())
|
|
|
- .getAddress();
|
|
|
+ ((InetSocketAddress)cnxn.getChannel().remoteAddress()).getAddress();
|
|
|
Set<NettyServerCnxn> s = ipMap.get(addr);
|
|
|
if (s == null) {
|
|
|
- s = new HashSet<NettyServerCnxn>();
|
|
|
+ s = new HashSet<>();
|
|
|
+ ipMap.put(addr, s);
|
|
|
}
|
|
|
s.add(cnxn);
|
|
|
- ipMap.put(addr,s);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void removeCnxnFromIpMap(NettyServerCnxn cnxn, InetAddress remoteAddress) {
|
|
|
+ synchronized (ipMap) {
|
|
|
+ Set<NettyServerCnxn> s = ipMap.get(remoteAddress);
|
|
|
+ if (s != null) {
|
|
|
+ s.remove(cnxn);
|
|
|
+ if (s.isEmpty()) {
|
|
|
+ ipMap.remove(remoteAddress);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.error(
|
|
|
+ "Unexpected null set for remote address {} when removing cnxn {}",
|
|
|
+ remoteAddress,
|
|
|
+ cnxn);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private int getClientCnxnCount(InetAddress addr) {
|
|
|
- Set<NettyServerCnxn> s = ipMap.get(addr);
|
|
|
- if (s == null) return 0;
|
|
|
- return s.size();
|
|
|
+ synchronized (ipMap) {
|
|
|
+ Set<NettyServerCnxn> s = ipMap.get(addr);
|
|
|
+ if (s == null) return 0;
|
|
|
+ return s.size();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -552,4 +551,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
return info;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Sets the test ByteBufAllocator. This allocator will be used by all
|
|
|
+ * future instances of this class.
|
|
|
+ * It is not recommended to use this method outside of testing.
|
|
|
+ * @param allocator the ByteBufAllocator to use for all netty buffer
|
|
|
+ * allocations.
|
|
|
+ */
|
|
|
+ static void setTestAllocator(ByteBufAllocator allocator) {
|
|
|
+ TEST_ALLOCATOR.set(allocator);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Clears the test ByteBufAllocator. The default allocator will be used
|
|
|
+ * by all future instances of this class.
|
|
|
+ * It is not recommended to use this method outside of testing.
|
|
|
+ */
|
|
|
+ static void clearTestAllocator() {
|
|
|
+ TEST_ALLOCATOR.set(null);
|
|
|
+ }
|
|
|
}
|