|
@@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.curator.framework.AuthInfo;
|
|
|
import org.apache.curator.framework.CuratorFramework;
|
|
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
|
@@ -39,13 +40,17 @@ import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.client.ZKClientConfig;
|
|
|
+import org.apache.zookeeper.common.ClientX509Util;
|
|
|
import org.apache.zookeeper.data.ACL;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
+
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.hadoop.util.Preconditions;
|
|
|
|
|
|
+import javax.naming.ConfigurationException;
|
|
|
+
|
|
|
/**
|
|
|
* Helper class that provides utility methods specific to ZK operations.
|
|
|
*/
|
|
@@ -122,7 +127,7 @@ public final class ZKCuratorManager {
|
|
|
* Start the connection to the ZooKeeper ensemble.
|
|
|
* @throws IOException If the connection cannot be started.
|
|
|
*/
|
|
|
- public void start() throws IOException {
|
|
|
+ public void start() throws IOException{
|
|
|
this.start(new ArrayList<>());
|
|
|
}
|
|
|
|
|
@@ -132,6 +137,20 @@ public final class ZKCuratorManager {
|
|
|
* @throws IOException If the connection cannot be started.
|
|
|
*/
|
|
|
public void start(List<AuthInfo> authInfos) throws IOException {
|
|
|
+ this.start(authInfos, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start the connection to the ZooKeeper ensemble.
|
|
|
+ *
|
|
|
+ * @param authInfos List of authentication keys.
|
|
|
+ * @param sslEnabled If the connection should be SSL/TLS encrypted.
|
|
|
+ * @throws IOException If the connection cannot be started.
|
|
|
+ */
|
|
|
+ public void start(List<AuthInfo> authInfos, boolean sslEnabled)
|
|
|
+ throws IOException{
|
|
|
+
|
|
|
+ ZKClientConfig zkClientConfig = new ZKClientConfig();
|
|
|
|
|
|
// Connect to the ZooKeeper ensemble
|
|
|
String zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
|
|
@@ -139,6 +158,8 @@ public final class ZKCuratorManager {
|
|
|
throw new IOException(
|
|
|
CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
|
|
|
}
|
|
|
+ LOG.debug("Configured {} as {}", CommonConfigurationKeys.ZK_ADDRESS, zkHostPort);
|
|
|
+
|
|
|
int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
|
|
|
CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
|
|
|
int zkSessionTimeout = conf.getInt(CommonConfigurationKeys.ZK_TIMEOUT_MS,
|
|
@@ -156,21 +177,49 @@ public final class ZKCuratorManager {
|
|
|
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
|
|
|
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
|
|
|
}
|
|
|
-
|
|
|
- CuratorFramework client = CuratorFrameworkFactory.builder()
|
|
|
- .connectString(zkHostPort)
|
|
|
- .zookeeperFactory(new HadoopZookeeperFactory(
|
|
|
- conf.get(CommonConfigurationKeys.ZK_SERVER_PRINCIPAL),
|
|
|
- conf.get(CommonConfigurationKeys.ZK_KERBEROS_PRINCIPAL),
|
|
|
- conf.get(CommonConfigurationKeys.ZK_KERBEROS_KEYTAB)))
|
|
|
- .sessionTimeoutMs(zkSessionTimeout)
|
|
|
- .retryPolicy(retryPolicy)
|
|
|
- .authorization(authInfos)
|
|
|
- .build();
|
|
|
+ if (sslEnabled) {
|
|
|
+ validateSslConfiguration(conf);
|
|
|
+ }
|
|
|
+ CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkHostPort)
|
|
|
+ .zookeeperFactory(
|
|
|
+ new HadoopZookeeperFactory(conf.get(CommonConfigurationKeys.ZK_SERVER_PRINCIPAL),
|
|
|
+ conf.get(CommonConfigurationKeys.ZK_KERBEROS_PRINCIPAL),
|
|
|
+ conf.get(CommonConfigurationKeys.ZK_KERBEROS_KEYTAB), sslEnabled,
|
|
|
+ new TruststoreKeystore(conf))).zkClientConfig(zkClientConfig)
|
|
|
+ .sessionTimeoutMs(zkSessionTimeout).retryPolicy(retryPolicy)
|
|
|
+ .authorization(authInfos).build();
|
|
|
client.start();
|
|
|
|
|
|
this.curator = client;
|
|
|
}
|
|
|
+ /* Check on SSL/TLS client connection requirements to emit the name of the
|
|
|
+ configuration missing. It improves supportability. */
|
|
|
+ private void validateSslConfiguration(Configuration config) throws IOException {
|
|
|
+ if (StringUtils.isEmpty(config.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION))) {
|
|
|
+ throw new IOException(
|
|
|
+ "The SSL encryption is enabled for the component's ZooKeeper client connection, "
|
|
|
+ + "however the " + CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION + " " +
|
|
|
+ "parameter is empty.");
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(config.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD))) {
|
|
|
+ throw new IOException(
|
|
|
+ "The SSL encryption is enabled for the component's " + "ZooKeeper client connection, "
|
|
|
+ + "however the " + CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD + " " +
|
|
|
+ "parameter is empty.");
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(config.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION))) {
|
|
|
+ throw new IOException(
|
|
|
+ "The SSL encryption is enabled for the component's ZooKeeper client connection, "
|
|
|
+ + "however the " + CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION + " " +
|
|
|
+ "parameter is empty.");
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(config.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD))) {
|
|
|
+ throw new IOException(
|
|
|
+ "The SSL encryption is enabled for the component's ZooKeeper client connection, "
|
|
|
+ + "however the " + CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD + " " +
|
|
|
+ "parameter is empty.");
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get ACLs for a ZNode.
|
|
@@ -414,14 +463,14 @@ public final class ZKCuratorManager {
|
|
|
throws Exception {
|
|
|
this.fencingNodePath = fencingNodePath;
|
|
|
curatorOperations.add(curator.transactionOp().create()
|
|
|
- .withMode(CreateMode.PERSISTENT)
|
|
|
- .withACL(fencingACL)
|
|
|
- .forPath(fencingNodePath, new byte[0]));
|
|
|
+ .withMode(CreateMode.PERSISTENT)
|
|
|
+ .withACL(fencingACL)
|
|
|
+ .forPath(fencingNodePath, new byte[0]));
|
|
|
}
|
|
|
|
|
|
public void commit() throws Exception {
|
|
|
curatorOperations.add(curator.transactionOp().delete()
|
|
|
- .forPath(fencingNodePath));
|
|
|
+ .forPath(fencingNodePath));
|
|
|
curator.transaction().forOperations(curatorOperations);
|
|
|
curatorOperations.clear();
|
|
|
}
|
|
@@ -429,21 +478,21 @@ public final class ZKCuratorManager {
|
|
|
public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
|
|
|
throws Exception {
|
|
|
curatorOperations.add(curator.transactionOp().create()
|
|
|
- .withMode(mode)
|
|
|
- .withACL(acl)
|
|
|
- .forPath(path, data));
|
|
|
+ .withMode(mode)
|
|
|
+ .withACL(acl)
|
|
|
+ .forPath(path, data));
|
|
|
}
|
|
|
|
|
|
public void delete(String path) throws Exception {
|
|
|
curatorOperations.add(curator.transactionOp().delete()
|
|
|
- .forPath(path));
|
|
|
+ .forPath(path));
|
|
|
}
|
|
|
|
|
|
public void setData(String path, byte[] data, int version)
|
|
|
throws Exception {
|
|
|
curatorOperations.add(curator.transactionOp().setData()
|
|
|
- .withVersion(version)
|
|
|
- .forPath(path, data));
|
|
|
+ .withVersion(version)
|
|
|
+ .forPath(path, data));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -452,21 +501,53 @@ public final class ZKCuratorManager {
|
|
|
private final String zkPrincipal;
|
|
|
private final String kerberosPrincipal;
|
|
|
private final String kerberosKeytab;
|
|
|
+ private final Boolean sslEnabled;
|
|
|
+ private final TruststoreKeystore truststoreKeystore;
|
|
|
|
|
|
+ /**
|
|
|
+ * Constructor for the helper class to configure the ZooKeeper client connection.
|
|
|
+ * @param zkPrincipal Optional.
|
|
|
+ */
|
|
|
public HadoopZookeeperFactory(String zkPrincipal) {
|
|
|
this(zkPrincipal, null, null);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Constructor for the helper class to configure the ZooKeeper client connection.
|
|
|
+ * @param zkPrincipal Optional.
|
|
|
+ * @param kerberosPrincipal Optional. Use along with kerberosKeytab.
|
|
|
+ * @param kerberosKeytab Optional. Use along with kerberosPrincipal.
|
|
|
+ */
|
|
|
public HadoopZookeeperFactory(String zkPrincipal, String kerberosPrincipal,
|
|
|
String kerberosKeytab) {
|
|
|
+ this(zkPrincipal, kerberosPrincipal, kerberosKeytab, false,
|
|
|
+ new TruststoreKeystore(new Configuration()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Constructor for the helper class to configure the ZooKeeper client connection.
|
|
|
+ *
|
|
|
+ * @param zkPrincipal Optional.
|
|
|
+ * @param kerberosPrincipal Optional. Use along with kerberosKeytab.
|
|
|
+ * @param kerberosKeytab Optional. Use along with kerberosPrincipal.
|
|
|
+ * @param sslEnabled Flag to enable SSL/TLS ZK client connection for each component
|
|
|
+ * independently.
|
|
|
+ * @param truststoreKeystore TruststoreKeystore object containing the keystoreLocation,
|
|
|
+ * keystorePassword, truststoreLocation, truststorePassword for
|
|
|
+ * SSL/TLS connection when sslEnabled is set to true.
|
|
|
+ */
|
|
|
+ public HadoopZookeeperFactory(String zkPrincipal, String kerberosPrincipal,
|
|
|
+ String kerberosKeytab, boolean sslEnabled, TruststoreKeystore truststoreKeystore) {
|
|
|
this.zkPrincipal = zkPrincipal;
|
|
|
this.kerberosPrincipal = kerberosPrincipal;
|
|
|
this.kerberosKeytab = kerberosKeytab;
|
|
|
+ this.sslEnabled = sslEnabled;
|
|
|
+ this.truststoreKeystore = truststoreKeystore;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout,
|
|
|
- Watcher watcher, boolean canBeReadOnly
|
|
|
+ Watcher watcher, boolean canBeReadOnly
|
|
|
) throws Exception {
|
|
|
ZKClientConfig zkClientConfig = new ZKClientConfig();
|
|
|
if (zkPrincipal != null) {
|
|
@@ -478,10 +559,65 @@ public final class ZKCuratorManager {
|
|
|
if (zkClientConfig.isSaslClientEnabled() && !isJaasConfigurationSet(zkClientConfig)) {
|
|
|
setJaasConfiguration(zkClientConfig);
|
|
|
}
|
|
|
+ if (sslEnabled) {
|
|
|
+ setSslConfiguration(zkClientConfig);
|
|
|
+ }
|
|
|
return new ZooKeeper(connectString, sessionTimeout, watcher,
|
|
|
canBeReadOnly, zkClientConfig);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Configure ZooKeeper Client with SSL/TLS connection.
|
|
|
+ * @param zkClientConfig ZooKeeper Client configuration
|
|
|
+ */
|
|
|
+ private void setSslConfiguration(ZKClientConfig zkClientConfig) throws ConfigurationException {
|
|
|
+ this.setSslConfiguration(zkClientConfig, new ClientX509Util());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setSslConfiguration(ZKClientConfig zkClientConfig, ClientX509Util x509Util)
|
|
|
+ throws ConfigurationException {
|
|
|
+ validateSslConfiguration();
|
|
|
+ LOG.info("Configuring the ZooKeeper client to use SSL/TLS encryption for connecting to the "
|
|
|
+ + "ZooKeeper server.");
|
|
|
+ LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
|
|
|
+ this.truststoreKeystore.keystoreLocation,
|
|
|
+ CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION);
|
|
|
+ LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
|
|
|
+ this.truststoreKeystore.truststoreLocation,
|
|
|
+ CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION);
|
|
|
+
|
|
|
+ zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
|
|
|
+ zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
|
|
|
+ "org.apache.zookeeper.ClientCnxnSocketNetty");
|
|
|
+ zkClientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(),
|
|
|
+ this.truststoreKeystore.keystoreLocation);
|
|
|
+ zkClientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(),
|
|
|
+ this.truststoreKeystore.keystorePassword);
|
|
|
+ zkClientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(),
|
|
|
+ this.truststoreKeystore.truststoreLocation);
|
|
|
+ zkClientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(),
|
|
|
+ this.truststoreKeystore.truststorePassword);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void validateSslConfiguration() throws ConfigurationException {
|
|
|
+ if (StringUtils.isEmpty(this.truststoreKeystore.keystoreLocation)) {
|
|
|
+ throw new ConfigurationException(
|
|
|
+ "The keystore location parameter is empty for the ZooKeeper client connection.");
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(this.truststoreKeystore.keystorePassword)) {
|
|
|
+ throw new ConfigurationException(
|
|
|
+ "The keystore password parameter is empty for the ZooKeeper client connection.");
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(this.truststoreKeystore.truststoreLocation)) {
|
|
|
+ throw new ConfigurationException(
|
|
|
+ "The truststore location parameter is empty for the ZooKeeper client connection.");
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(this.truststoreKeystore.truststorePassword)) {
|
|
|
+ throw new ConfigurationException(
|
|
|
+ "The truststore password parameter is empty for the ZooKeeper client connection.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private boolean isJaasConfigurationSet(ZKClientConfig zkClientConfig) {
|
|
|
String clientConfig = zkClientConfig.getProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
|
|
|
ZKClientConfig.LOGIN_CONTEXT_NAME_KEY_DEFAULT);
|
|
@@ -503,4 +639,44 @@ public final class ZKCuratorManager {
|
|
|
zkClientConfig.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, JAAS_CLIENT_ENTRY);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper class to contain the Truststore/Keystore paths for the ZK client connection over
|
|
|
+ * SSL/TLS.
|
|
|
+ */
|
|
|
+ public static class TruststoreKeystore {
|
|
|
+ private final String keystoreLocation;
|
|
|
+ private final String keystorePassword;
|
|
|
+ private final String truststoreLocation;
|
|
|
+ private final String truststorePassword;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Configuration for the ZooKeeper connection when SSL/TLS is enabled.
|
|
|
+ * When a value is not configured, ensure that empty string is set instead of null.
|
|
|
+ *
|
|
|
+ * @param conf ZooKeeper Client configuration
|
|
|
+ */
|
|
|
+ public TruststoreKeystore(Configuration conf) {
|
|
|
+ keystoreLocation = conf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, "");
|
|
|
+ keystorePassword = conf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "");
|
|
|
+ truststoreLocation = conf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, "");
|
|
|
+ truststorePassword = conf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "");
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getKeystoreLocation() {
|
|
|
+ return keystoreLocation;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getKeystorePassword() {
|
|
|
+ return keystorePassword;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getTruststoreLocation() {
|
|
|
+ return truststoreLocation;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getTruststorePassword() {
|
|
|
+ return truststorePassword;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|