|
@@ -25,6 +25,7 @@ import java.security.KeyManagementException;
|
|
|
import java.security.NoSuchAlgorithmException;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Set;
|
|
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import javax.net.ssl.SSLContext;
|
|
|
import javax.net.ssl.SSLEngine;
|
|
|
+import javax.net.ssl.SSLException;
|
|
|
import javax.net.ssl.SSLPeerUnverifiedException;
|
|
|
import javax.net.ssl.SSLSession;
|
|
|
import javax.net.ssl.X509KeyManager;
|
|
@@ -43,6 +45,7 @@ import io.netty.buffer.ByteBufAllocator;
|
|
|
import io.netty.channel.Channel;
|
|
|
import io.netty.channel.ChannelDuplexHandler;
|
|
|
import io.netty.channel.ChannelFuture;
|
|
|
+import io.netty.channel.ChannelHandler;
|
|
|
import io.netty.channel.ChannelHandler.Sharable;
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
|
import io.netty.channel.ChannelInitializer;
|
|
@@ -54,6 +57,9 @@ import io.netty.channel.group.ChannelGroup;
|
|
|
import io.netty.channel.group.ChannelGroupFuture;
|
|
|
import io.netty.channel.group.DefaultChannelGroup;
|
|
|
import io.netty.channel.socket.SocketChannel;
|
|
|
+import io.netty.handler.ssl.OpenSsl;
|
|
|
+import io.netty.handler.ssl.OptionalSslHandler;
|
|
|
+import io.netty.handler.ssl.SslContext;
|
|
|
import io.netty.handler.ssl.SslHandler;
|
|
|
import io.netty.util.AttributeKey;
|
|
|
import io.netty.util.ReferenceCountUtil;
|
|
@@ -63,16 +69,32 @@ import io.netty.util.concurrent.GenericFutureListener;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.common.ClientX509Util;
|
|
|
import org.apache.zookeeper.common.NettyUtils;
|
|
|
+import org.apache.zookeeper.common.SSLContextAndOptions;
|
|
|
import org.apache.zookeeper.common.X509Exception;
|
|
|
import org.apache.zookeeper.common.X509Exception.SSLContextException;
|
|
|
import org.apache.zookeeper.server.auth.ProviderRegistry;
|
|
|
import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
|
|
|
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
|
|
|
|
|
|
+ /**
|
|
|
+ * Allow client-server sockets to accept both SSL and plaintext connections
|
|
|
+ */
|
|
|
+ public static final String PORT_UNIFICATION_KEY = "zookeeper.client.portUnification";
|
|
|
+ private final boolean shouldUsePortUnification;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The first byte in TLS protocol is the content type of the subsequent record.
|
|
|
+ * Handshakes use value 22 (0x16) so the first byte offered on any TCP connection
|
|
|
+ * attempting to establish a TLS connection will be this value.
|
|
|
+ * https://tools.ietf.org/html/rfc8446#page-79
|
|
|
+ */
|
|
|
+ private static final byte TLS_HANDSHAKE_RECORD_TYPE = 0x16;
|
|
|
+
|
|
|
private final ServerBootstrap bootstrap;
|
|
|
private Channel parentChannel;
|
|
|
private final ChannelGroup allChannels =
|
|
@@ -91,6 +113,66 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
|
|
|
new AtomicReference<>(null);
|
|
|
|
|
|
+ /**
|
|
|
+ * A handler that detects whether the client would like to use
|
|
|
+ * TLS or not and responds in kind. The first bytes are examined
|
|
|
+ * for the static TLS headers to make the determination and
|
|
|
+ * placed back in the stream with the correct ChannelHandler
|
|
|
+ * instantiated.
|
|
|
+ */
|
|
|
+ class DualModeSslHandler extends OptionalSslHandler {
|
|
|
+ DualModeSslHandler(SslContext sslContext) {
|
|
|
+ super(sslContext);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void decode(ChannelHandlerContext context, ByteBuf in, List<Object> out) throws Exception {
|
|
|
+ if (in.readableBytes() >= 5) {
|
|
|
+ super.decode(context, in, out);
|
|
|
+ } else if (in.readableBytes() > 0) {
|
|
|
+ // It requires 5 bytes to detect a proper ssl connection. In the
|
|
|
+ // case that the server receives fewer, check if we can fail to plaintext.
|
|
|
+ // This will occur when for any four letter work commands.
|
|
|
+ if (TLS_HANDSHAKE_RECORD_TYPE != in.getByte(0)) {
|
|
|
+ LOG.debug("first byte {} does not match TLS handshake, failing to plaintext", in.getByte(0));
|
|
|
+ handleNonSsl(context);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * pulled directly from OptionalSslHandler to allow for access
|
|
|
+ * @param context
|
|
|
+ */
|
|
|
+ private void handleNonSsl(ChannelHandlerContext context) {
|
|
|
+ ChannelHandler handler = this.newNonSslHandler(context);
|
|
|
+ if (handler != null) {
|
|
|
+ context.pipeline().replace(this, this.newNonSslHandlerName(), handler);
|
|
|
+ } else {
|
|
|
+ context.pipeline().remove(this);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected SslHandler newSslHandler(ChannelHandlerContext context, SslContext sslContext) {
|
|
|
+ NettyServerCnxn cnxn = Objects.requireNonNull(context.channel().attr(CONNECTION_ATTRIBUTE).get());
|
|
|
+ LOG.debug("creating ssl handler for session {}", cnxn.getSessionId());
|
|
|
+ SslHandler handler = super.newSslHandler(context, sslContext);
|
|
|
+ Future<Channel> handshakeFuture = handler.handshakeFuture();
|
|
|
+ handshakeFuture.addListener(new CertificateVerifier(handler, cnxn));
|
|
|
+ return handler;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ChannelHandler newNonSslHandler(ChannelHandlerContext context) {
|
|
|
+ NettyServerCnxn cnxn = Objects.requireNonNull(context.channel().attr(CONNECTION_ATTRIBUTE).get());
|
|
|
+ LOG.debug("creating plaintext handler for session {}", cnxn.getSessionId());
|
|
|
+ allChannels.add(context.channel());
|
|
|
+ addCnxn(cnxn);
|
|
|
+ return super.newNonSslHandler(context);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This is an inner class since we need to extend ChannelDuplexHandler, but
|
|
|
* NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
|
|
@@ -124,7 +206,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
|
|
|
Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
|
|
|
handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
|
|
|
- } else {
|
|
|
+ } else if (!shouldUsePortUnification) {
|
|
|
allChannels.add(ctx.channel());
|
|
|
addCnxn(cnxn);
|
|
|
}
|
|
@@ -218,28 +300,51 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
}
|
|
|
super.write(ctx, msg, promise);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- private final class CertificateVerifier implements GenericFutureListener<Future<Channel>> {
|
|
|
- private final SslHandler sslHandler;
|
|
|
- private final NettyServerCnxn cnxn;
|
|
|
+ final class CertificateVerifier implements GenericFutureListener<Future<Channel>> {
|
|
|
+ private final SslHandler sslHandler;
|
|
|
+ private final NettyServerCnxn cnxn;
|
|
|
|
|
|
- CertificateVerifier(SslHandler sslHandler, NettyServerCnxn cnxn) {
|
|
|
- this.sslHandler = sslHandler;
|
|
|
- this.cnxn = cnxn;
|
|
|
- }
|
|
|
+ CertificateVerifier(SslHandler sslHandler, NettyServerCnxn cnxn) {
|
|
|
+ this.sslHandler = sslHandler;
|
|
|
+ this.cnxn = cnxn;
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * Only allow the connection to stay open if certificate passes auth
|
|
|
- */
|
|
|
- public void operationComplete(Future<Channel> future) throws SSLPeerUnverifiedException {
|
|
|
- if (future.isSuccess()) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Successful handshake with session 0x{}",
|
|
|
- Long.toHexString(cnxn.getSessionId()));
|
|
|
- }
|
|
|
- SSLEngine eng = sslHandler.engine();
|
|
|
+ /**
|
|
|
+ * Only allow the connection to stay open if certificate passes auth
|
|
|
+ */
|
|
|
+ public void operationComplete(Future<Channel> future) {
|
|
|
+ if (future.isSuccess()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Successful handshake with session 0x{}",
|
|
|
+ Long.toHexString(cnxn.getSessionId()));
|
|
|
+ }
|
|
|
+ SSLEngine eng = sslHandler.engine();
|
|
|
+ // Don't try to verify certificate if we didn't ask client to present one
|
|
|
+ if (eng.getNeedClientAuth() || eng.getWantClientAuth()) {
|
|
|
SSLSession session = eng.getSession();
|
|
|
- cnxn.setClientCertificateChain(session.getPeerCertificates());
|
|
|
+ try {
|
|
|
+ cnxn.setClientCertificateChain(session.getPeerCertificates());
|
|
|
+ } catch (SSLPeerUnverifiedException e) {
|
|
|
+ if (eng.getNeedClientAuth()) {
|
|
|
+ // Certificate was requested but not present
|
|
|
+ LOG.error("Error getting peer certificates", e);
|
|
|
+ cnxn.close();
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ // Certificate was requested but was optional
|
|
|
+ // TODO: what auth info should we set on the connection?
|
|
|
+ final Channel futureChannel = future.getNow();
|
|
|
+ allChannels.add(Objects.requireNonNull(futureChannel));
|
|
|
+ addCnxn(cnxn);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error getting peer certificates", e);
|
|
|
+ cnxn.close();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
String authProviderProp
|
|
|
= System.getProperty(x509Util.getSslAuthProviderProperty(), "x509");
|
|
@@ -249,7 +354,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
ProviderRegistry.getProvider(authProviderProp);
|
|
|
|
|
|
if (authProvider == null) {
|
|
|
- LOG.error("Auth provider not found: {}", authProviderProp);
|
|
|
+ LOG.error("X509 Auth provider not found: {}", authProviderProp);
|
|
|
cnxn.close();
|
|
|
return;
|
|
|
}
|
|
@@ -261,15 +366,15 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
cnxn.close();
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- final Channel futureChannel = future.getNow();
|
|
|
- allChannels.add(Objects.requireNonNull(futureChannel));
|
|
|
- addCnxn(cnxn);
|
|
|
- } else {
|
|
|
- LOG.error("Unsuccessful handshake with session 0x{}",
|
|
|
- Long.toHexString(cnxn.getSessionId()));
|
|
|
- cnxn.close();
|
|
|
}
|
|
|
+
|
|
|
+ final Channel futureChannel = future.getNow();
|
|
|
+ allChannels.add(Objects.requireNonNull(futureChannel));
|
|
|
+ addCnxn(cnxn);
|
|
|
+ } else {
|
|
|
+ LOG.error("Unsuccessful handshake with session 0x{}",
|
|
|
+ Long.toHexString(cnxn.getSessionId()));
|
|
|
+ cnxn.close();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -290,6 +395,18 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
NettyServerCnxnFactory() {
|
|
|
x509Util = new ClientX509Util();
|
|
|
|
|
|
+ boolean usePortUnification = Boolean.getBoolean(PORT_UNIFICATION_KEY);
|
|
|
+ LOG.info("{}={}", PORT_UNIFICATION_KEY, usePortUnification);
|
|
|
+ if (usePortUnification) {
|
|
|
+ try {
|
|
|
+ QuorumPeerConfig.configureSSLAuth();
|
|
|
+ } catch (QuorumPeerConfig.ConfigException e) {
|
|
|
+ LOG.error("unable to set up SslAuthProvider, turning off client port unification", e);
|
|
|
+ usePortUnification = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ this.shouldUsePortUnification = usePortUnification;
|
|
|
+
|
|
|
EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(
|
|
|
NettyUtils.getClientReachableLocalInetAddressCount());
|
|
|
EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
|
|
@@ -306,7 +423,9 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
protected void initChannel(SocketChannel ch) throws Exception {
|
|
|
ChannelPipeline pipeline = ch.pipeline();
|
|
|
if (secure) {
|
|
|
- initSSL(pipeline);
|
|
|
+ initSSL(pipeline, false);
|
|
|
+ } else if (shouldUsePortUnification) {
|
|
|
+ initSSL(pipeline, true);
|
|
|
}
|
|
|
pipeline.addLast("servercnxnfactory", channelHandler);
|
|
|
}
|
|
@@ -315,37 +434,41 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
this.bootstrap.validate();
|
|
|
}
|
|
|
|
|
|
- private synchronized void initSSL(ChannelPipeline p)
|
|
|
+ private synchronized void initSSL(ChannelPipeline p, boolean supportPlaintext)
|
|
|
throws X509Exception, KeyManagementException, NoSuchAlgorithmException {
|
|
|
String authProviderProp = System.getProperty(x509Util.getSslAuthProviderProperty());
|
|
|
- SSLContext sslContext;
|
|
|
+ SslContext nettySslContext;
|
|
|
if (authProviderProp == null) {
|
|
|
- sslContext = x509Util.getDefaultSSLContext();
|
|
|
+ SSLContextAndOptions sslContextAndOptions = x509Util.getDefaultSSLContextAndOptions();
|
|
|
+ nettySslContext = sslContextAndOptions.createNettyJdkSslContext(
|
|
|
+ sslContextAndOptions.getSSLContext(), false);
|
|
|
} else {
|
|
|
- sslContext = SSLContext.getInstance("TLSv1");
|
|
|
+ SSLContext sslContext = SSLContext.getInstance(ClientX509Util.DEFAULT_PROTOCOL);
|
|
|
X509AuthenticationProvider authProvider =
|
|
|
- (X509AuthenticationProvider)ProviderRegistry.getProvider(
|
|
|
+ (X509AuthenticationProvider) ProviderRegistry.getProvider(
|
|
|
System.getProperty(x509Util.getSslAuthProviderProperty(), "x509"));
|
|
|
|
|
|
- if (authProvider == null)
|
|
|
- {
|
|
|
+ if (authProvider == null) {
|
|
|
LOG.error("Auth provider not found: {}", authProviderProp);
|
|
|
throw new SSLContextException(
|
|
|
"Could not create SSLContext with specified auth provider: " +
|
|
|
- authProviderProp);
|
|
|
+ authProviderProp);
|
|
|
}
|
|
|
|
|
|
- sslContext.init(new X509KeyManager[] { authProvider.getKeyManager() },
|
|
|
- new X509TrustManager[] { authProvider.getTrustManager() },
|
|
|
- null);
|
|
|
+ sslContext.init(new X509KeyManager[]{authProvider.getKeyManager()},
|
|
|
+ new X509TrustManager[]{authProvider.getTrustManager()},
|
|
|
+ null);
|
|
|
+ nettySslContext = x509Util.getDefaultSSLContextAndOptions()
|
|
|
+ .createNettyJdkSslContext(sslContext,false);
|
|
|
}
|
|
|
|
|
|
- SSLEngine sslEngine = sslContext.createSSLEngine();
|
|
|
- sslEngine.setUseClientMode(false);
|
|
|
- sslEngine.setNeedClientAuth(true);
|
|
|
-
|
|
|
- p.addLast("ssl", new SslHandler(sslEngine));
|
|
|
- LOG.info("SSL handler added for channel: {}", p.channel());
|
|
|
+ if (supportPlaintext) {
|
|
|
+ p.addLast("ssl", new DualModeSslHandler(nettySslContext));
|
|
|
+ LOG.debug("dual mode SSL handler added for channel: {}", p.channel());
|
|
|
+ } else {
|
|
|
+ p.addLast("ssl", nettySslContext.newHandler(p.channel().alloc()));
|
|
|
+ LOG.debug("SSL handler added for channel: {}", p.channel());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|