Преглед изворни кода

HADOOP-18956. Zookeeper SSL/TLS support in ZKDelegationTokenSecretManager and ZKSignerSecretProvider (#6263)

Istvan Fajth пре 1 година
родитељ
комит
7a55442297
12 измењених фајлова са 982 додато и 209 уклоњено
  1. 34 83
      hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java
  2. 318 0
      hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZookeeperClient.java
  3. 15 0
      hadoop-common-project/hadoop-auth/src/site/markdown/Configuration.md
  4. 498 0
      hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestZookeeperClientCreation.java
  5. 5 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
  6. 79 118
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
  7. 12 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  8. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
  9. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  10. 5 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  11. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
  12. 6 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

+ 34 - 83
hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java

@@ -16,25 +16,13 @@ package org.apache.hadoop.security.authentication.util;
 import org.apache.hadoop.classification.VisibleForTesting;
 import java.nio.ByteBuffer;
 import java.security.SecureRandom;
-import java.util.Collections;
-import java.util.List;
 import java.util.Properties;
 import java.util.Random;
-import javax.security.auth.login.Configuration;
 import javax.servlet.ServletContext;
-import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.imps.DefaultACLProvider;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.client.ZKClientConfig;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,6 +80,16 @@ public class ZKSignerSecretProvider extends RolloverSignerSecretProvider {
   public static final String ZOOKEEPER_KERBEROS_PRINCIPAL =
           CONFIG_PREFIX + "kerberos.principal";
 
+  public static final String ZOOKEEPER_SSL_ENABLED = CONFIG_PREFIX + "ssl.enabled";
+  public static final String ZOOKEEPER_SSL_KEYSTORE_LOCATION =
+      CONFIG_PREFIX + "ssl.keystore.location";
+  public static final String ZOOKEEPER_SSL_KEYSTORE_PASSWORD =
+      CONFIG_PREFIX + "ssl.keystore.password";
+  public static final String ZOOKEEPER_SSL_TRUSTSTORE_LOCATION =
+      CONFIG_PREFIX + "ssl.truststore.location";
+  public static final String ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD =
+      CONFIG_PREFIX + "ssl.truststore.password";
+
   /**
    * Constant for the property that specifies whether or not the Curator client
    * should disconnect from ZooKeeper on shutdown.  The default is "true".  Only
@@ -350,80 +348,33 @@ public class ZKSignerSecretProvider extends RolloverSignerSecretProvider {
    * This method creates the Curator client and connects to ZooKeeper.
    * @param config configuration properties
    * @return A Curator client
-   * @throws Exception thrown if an error occurred
    */
-  protected CuratorFramework createCuratorClient(Properties config)
-          throws Exception {
-    String connectionString = config.getProperty(
-            ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
-
-    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-    ACLProvider aclProvider;
+  protected CuratorFramework createCuratorClient(Properties config) {
+    String connectionString = config.getProperty(ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
     String authType = config.getProperty(ZOOKEEPER_AUTH_TYPE, "none");
-    if (authType.equals("sasl")) {
-      LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
-              + "and using 'sasl' ACLs");
-      String principal = setJaasConfiguration(config);
-      System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
-              JAAS_LOGIN_ENTRY_NAME);
-      System.setProperty("zookeeper.authProvider.1",
-              "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
-      aclProvider = new SASLOwnerACLProvider(principal);
-    } else {  // "none"
-      LOG.info("Connecting to ZooKeeper without authentication");
-      aclProvider = new DefaultACLProvider();     // open to everyone
-    }
-    CuratorFramework cf = CuratorFrameworkFactory.builder()
-            .connectString(connectionString)
-            .retryPolicy(retryPolicy)
-            .aclProvider(aclProvider)
-            .build();
-    cf.start();
-    return cf;
-  }
-
-  private String setJaasConfiguration(Properties config) throws Exception {
-    String keytabFile = config.getProperty(ZOOKEEPER_KERBEROS_KEYTAB).trim();
-    if (keytabFile == null || keytabFile.length() == 0) {
-      throw new IllegalArgumentException(ZOOKEEPER_KERBEROS_KEYTAB
-              + " must be specified");
-    }
-    String principal = config.getProperty(ZOOKEEPER_KERBEROS_PRINCIPAL)
-            .trim();
-    if (principal == null || principal.length() == 0) {
-      throw new IllegalArgumentException(ZOOKEEPER_KERBEROS_PRINCIPAL
-              + " must be specified");
-    }
+    String keytab = config.getProperty(ZOOKEEPER_KERBEROS_KEYTAB, "").trim();
+    String principal = config.getProperty(ZOOKEEPER_KERBEROS_PRINCIPAL, "").trim();
 
-    // This is equivalent to writing a jaas.conf file and setting the system
-    // property, "java.security.auth.login.config", to point to it
-    JaasConfiguration jConf =
-            new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile);
-    Configuration.setConfiguration(jConf);
-    return principal.split("[/@]")[0];
-  }
+    boolean sslEnabled = Boolean.parseBoolean(config.getProperty(ZOOKEEPER_SSL_ENABLED, "false"));
+    String keystoreLocation = config.getProperty(ZOOKEEPER_SSL_KEYSTORE_LOCATION, "");
+    String keystorePassword = config.getProperty(ZOOKEEPER_SSL_KEYSTORE_PASSWORD, "");
+    String truststoreLocation = config.getProperty(ZOOKEEPER_SSL_TRUSTSTORE_LOCATION, "");
+    String truststorePassword = config.getProperty(ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD, "");
 
-  /**
-   * Simple implementation of an {@link ACLProvider} that simply returns an ACL
-   * that gives all permissions only to a single principal.
-   */
-  private static class SASLOwnerACLProvider implements ACLProvider {
-
-    private final List<ACL> saslACL;
-
-    private SASLOwnerACLProvider(String principal) {
-      this.saslACL = Collections.singletonList(
-              new ACL(Perms.ALL, new Id("sasl", principal)));
-    }
-
-    @Override
-    public List<ACL> getDefaultAcl() {
-      return saslACL;
-    }
-
-    @Override
-    public List<ACL> getAclForPath(String path) {
-      return saslACL;
-    }
+    CuratorFramework zkClient =
+        ZookeeperClient.configure()
+            .withConnectionString(connectionString)
+            .withAuthType(authType)
+            .withKeytab(keytab)
+            .withPrincipal(principal)
+            .withJaasLoginEntryName(JAAS_LOGIN_ENTRY_NAME)
+            .enableSSL(sslEnabled)
+            .withKeystore(keystoreLocation)
+            .withKeystorePassword(keystorePassword)
+            .withTruststore(truststoreLocation)
+            .withTruststorePassword(truststorePassword)
+            .create();
+    zkClient.start();
+    return zkClient;
   }
 }

+ 318 - 0
hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZookeeperClient.java

@@ -0,0 +1,318 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.security.authentication.util;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ConfigurableZookeeperFactory;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.Configuration;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility class to create a CuratorFramework object that can be used to connect to Zookeeper
+ * based on configuration values that can be supplied from different configuration properties.
+ * It is used from ZKDelegationTokenSecretManager in hadoop-common, and from
+ * {@link ZKSignerSecretProvider}.
+ *
+ * The class implements a fluid API to set up all the different properties. A very basic setup
+ * would seem like:
+ * <pre>
+ *   ZookeeperClient.configure()
+ *     .withConnectionString(&lt;connectionString&gt;)
+ *     .create();
+ * </pre>
+ *
+ * Mandatory parameters to be set:
+ * <ul>
+ *   <li>connectionString: A Zookeeper connection string.</li>
+ *   <li>if authentication type is set to 'sasl':
+ *   <ul>
+ *     <li>keytab: the location of the keytab to be used for Kerberos authentication</li>
+ *     <li>principal: the Kerberos principal to be used from the supplied Kerberos keytab file.</li>
+ *     <li>jaasLoginEntryName: the login entry name in the JAAS configuration that is created for
+ *                           the KerberosLoginModule to be used by the Zookeeper client code.</li>
+ *   </ul>
+ *   </li>
+ *   <li>if SSL is enabled:
+ *   <ul>
+ *     <li>the location of the Truststore file to be used</li>
+ *     <li>the location of the Keystore file to be used</li>
+ *     <li>if the Truststore is protected by a password, then the password of the Truststore</li>
+ *     <li>if the Keystore is protected by a password, then the password if the Keystore</li>
+ *   </ul>
+ *   </li>
+ * </ul>
+ *
+ * When using 'sasl' authentication type, the JAAS configuration to be used by the Zookeeper client
+ * withing CuratorFramework is set to use the supplied keytab and principal for Kerberos login,
+ * moreover an ACL provider is set to provide a default ACL that requires SASL auth and the same
+ * principal to have access to the used paths.
+ *
+ * When using SSL/TLS, the Zookeeper client will set to use the secure channel towards Zookeeper,
+ * with the specified Keystore and Truststore.
+ *
+ * Default values:
+ * <ul>
+ *   <li>authentication type: 'none'</li>
+ *   <li>sessionTimeout: either the system property curator-default-session-timeout, or 60
+ *                     seconds</li>
+ *   <li>connectionTimeout: either the system property curator-default-connection-timeout, or 15
+ *                        seconds</li>
+ *   <li>retryPolicy: an ExponentialBackoffRetry, with a starting interval of 1 seconds and 3
+ *                  retries</li>
+ *   <li>zkFactory: a ConfigurableZookeeperFactory instance, to allow SSL setup via
+ *                ZKClientConfig</li>
+ * </ul>
+ *
+ * @see ZKSignerSecretProvider
+ */
+public class ZookeeperClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZookeeperClient.class);
+
+  private String connectionString;
+  private String namespace;
+
+  private String authenticationType = "none";
+  private String keytab;
+  private String principal;
+  private String jaasLoginEntryName;
+
+  private int sessionTimeout =
+      Integer.getInteger("curator-default-session-timeout", 60 * 1000);
+  private int connectionTimeout =
+      Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
+
+  private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+
+  private ZookeeperFactory zkFactory = new ConfigurableZookeeperFactory();
+
+  private boolean isSSLEnabled;
+  private String keystoreLocation;
+  private String keystorePassword;
+  private String truststoreLocation;
+  private String truststorePassword;
+
+  public static ZookeeperClient configure() {
+    return new ZookeeperClient();
+  }
+
+  public ZookeeperClient withConnectionString(String conn) {
+    connectionString = conn;
+    return this;
+  }
+
+  public ZookeeperClient withNamespace(String ns) {
+    this.namespace = ns;
+    return this;
+  }
+
+  public ZookeeperClient withAuthType(String authType) {
+    this.authenticationType = authType;
+    return this;
+  }
+
+  public ZookeeperClient withKeytab(String keytabPath) {
+    this.keytab = keytabPath;
+    return this;
+  }
+
+  public ZookeeperClient withPrincipal(String princ) {
+    this.principal = princ;
+    return this;
+  }
+
+  public ZookeeperClient withJaasLoginEntryName(String entryName) {
+    this.jaasLoginEntryName = entryName;
+    return this;
+  }
+
+  public ZookeeperClient withSessionTimeout(int timeoutMS) {
+    this.sessionTimeout = timeoutMS;
+    return this;
+  }
+
+  public ZookeeperClient withConnectionTimeout(int timeoutMS) {
+    this.connectionTimeout = timeoutMS;
+    return this;
+  }
+
+  public ZookeeperClient withRetryPolicy(RetryPolicy policy) {
+    this.retryPolicy = policy;
+    return this;
+  }
+
+  public ZookeeperClient withZookeeperFactory(ZookeeperFactory factory) {
+    this.zkFactory = factory;
+    return this;
+  }
+
+  public ZookeeperClient enableSSL(boolean enable) {
+    this.isSSLEnabled = enable;
+    return this;
+  }
+
+  public ZookeeperClient withKeystore(String keystorePath) {
+    this.keystoreLocation = keystorePath;
+    return this;
+  }
+
+  public ZookeeperClient withKeystorePassword(String keystorePass) {
+    this.keystorePassword = keystorePass;
+    return this;
+  }
+
+  public ZookeeperClient withTruststore(String truststorePath) {
+    this.truststoreLocation = truststorePath;
+    return this;
+  }
+
+  public ZookeeperClient withTruststorePassword(String truststorePass) {
+    this.truststorePassword = truststorePass;
+    return this;
+  }
+
+  public CuratorFramework create() {
+    checkNotNull(connectionString, "Zookeeper connection string cannot be null!");
+    checkNotNull(retryPolicy, "Zookeeper connection retry policy cannot be null!");
+
+    return createFrameworkFactoryBuilder()
+        .connectString(connectionString)
+        .zookeeperFactory(zkFactory)
+        .namespace(namespace)
+        .sessionTimeoutMs(sessionTimeout)
+        .connectionTimeoutMs(connectionTimeout)
+        .retryPolicy(retryPolicy)
+        .aclProvider(aclProvider())
+        .zkClientConfig(zkClientConfig())
+        .build();
+  }
+
+  @VisibleForTesting
+  CuratorFrameworkFactory.Builder createFrameworkFactoryBuilder() {
+    return CuratorFrameworkFactory.builder();
+  }
+
+  private ACLProvider aclProvider() {
+    // AuthType has to be explicitly set to 'none' or 'sasl'
+    checkNotNull(authenticationType, "Zookeeper authType cannot be null!");
+    checkArgument(authenticationType.equals("sasl") || authenticationType.equals("none"),
+        "Zookeeper authType must be one of [none, sasl]!");
+
+    ACLProvider aclProvider;
+    if (authenticationType.equals("sasl")) {
+      LOG.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs.");
+
+      checkArgument(!isEmpty(keytab), "Zookeeper client's Kerberos Keytab must be specified!");
+      checkArgument(!isEmpty(principal),
+          "Zookeeper client's Kerberos Principal must be specified!");
+      checkArgument(!isEmpty(jaasLoginEntryName), "JAAS Login Entry name must be specified!");
+
+      JaasConfiguration jConf = new JaasConfiguration(jaasLoginEntryName, principal, keytab);
+      Configuration.setConfiguration(jConf);
+      System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, jaasLoginEntryName);
+      System.setProperty("zookeeper.authProvider.1",
+          "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+      aclProvider = new SASLOwnerACLProvider(principal.split("[/@]")[0]);
+    } else { // "none"
+      LOG.info("Connecting to ZooKeeper without authentication.");
+      aclProvider = new DefaultACLProvider(); // open to everyone
+    }
+    return aclProvider;
+  }
+
+  private ZKClientConfig zkClientConfig() {
+    ZKClientConfig zkClientConfig = new ZKClientConfig();
+    if (isSSLEnabled){
+      LOG.info("Zookeeper client will use SSL connection. (keystore = {}; truststore = {};)",
+          keystoreLocation, truststoreLocation);
+      checkArgument(!isEmpty(keystoreLocation),
+            "The keystore location parameter is empty for the ZooKeeper client connection.");
+      checkArgument(!isEmpty(truststoreLocation),
+            "The truststore location parameter is empty for the ZooKeeper client connection.");
+
+      try (ClientX509Util sslOpts = new ClientX509Util()) {
+        zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+        zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+            "org.apache.zookeeper.ClientCnxnSocketNetty");
+        zkClientConfig.setProperty(sslOpts.getSslKeystoreLocationProperty(), keystoreLocation);
+        zkClientConfig.setProperty(sslOpts.getSslKeystorePasswdProperty(), keystorePassword);
+        zkClientConfig.setProperty(sslOpts.getSslTruststoreLocationProperty(), truststoreLocation);
+        zkClientConfig.setProperty(sslOpts.getSslTruststorePasswdProperty(), truststorePassword);
+      }
+    } else {
+      LOG.info("Zookeeper client will use Plain connection.");
+    }
+    return zkClientConfig;
+  }
+
+  /**
+   * Simple implementation of an {@link ACLProvider} that simply returns an ACL
+   * that gives all permissions only to a single principal.
+   */
+  @VisibleForTesting
+  static final class SASLOwnerACLProvider implements ACLProvider {
+
+    private final List<ACL> saslACL;
+
+    private SASLOwnerACLProvider(String principal) {
+      this.saslACL = Collections.singletonList(
+          new ACL(ZooDefs.Perms.ALL, new Id("sasl", principal)));
+    }
+
+    @Override
+    public List<ACL> getDefaultAcl() {
+      return saslACL;
+    }
+
+    @Override
+    public List<ACL> getAclForPath(String path) {
+      return saslACL;
+    }
+  }
+
+  private boolean isEmpty(String str) {
+    return str == null || str.length() == 0;
+  }
+
+  //Preconditions allowed to be imported from hadoop-common, but that results
+  //  in a circular dependency
+  private void checkNotNull(Object reference, String errorMessage) {
+    if (reference == null) {
+      throw new NullPointerException(errorMessage);
+    }
+  }
+
+  private void checkArgument(boolean expression, String errorMessage) {
+    if (!expression) {
+      throw new IllegalArgumentException(errorMessage);
+    }
+  }
+}

+ 15 - 0
hadoop-common-project/hadoop-auth/src/site/markdown/Configuration.md

@@ -404,6 +404,21 @@ The following configuration properties are specific to the `zookeeper` implement
 *   `signer.secret.provider.zookeeper.kerberos.principal`: Set this to the
     Kerberos principal to use. This only required if using Kerberos.
 
+*   `signer.secret.provider.zookeeper.ssl.enabled` : Set this to true to enable SSL/TLS
+    communication between the server and Zookeeper, if the SignerSecretProvider is zookeeper.
+
+*   `signer.secret.provider.zookeeper.ssl.keystore.location` : Specifies the location of the
+    Zookeeper client's keystore file.
+
+*   `signer.secret.provider.zookeeper.ssl.keystore.password` : Specifies the location of the
+    Zookeeper client's keystore password.
+
+*   `signer.secret.provider.zookeeper.ssl.truststore.location` : Specifies the location of the
+    Zookeeper client's truststore file.
+
+*   `signer.secret.provider.zookeeper.ssl.truststore.password` : Specifies the location of the
+    Zookeeper client's truststore password.
+
 *   `signer.secret.provider.zookeeper.disconnect.on.shutdown`: Whether to close the
     ZooKeeper connection when the provider is shutdown. The default value is `true`.
     Only set this to `false` if a custom Curator client is being provided and

+ 498 - 0
hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestZookeeperClientCreation.java

@@ -0,0 +1,498 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hadoop.security.authentication.util;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ConfigurableZookeeperFactory;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.hadoop.security.authentication.util.ZookeeperClient.SASLOwnerACLProvider;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.hamcrest.Matcher;
+import org.hamcrest.core.IsNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
+import java.util.Arrays;
+
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentCaptor.forClass;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for ZookeeperClient class, to check if it creates CuratorFramework by providing expected
+ * parameter values to the CuratorFrameworkFactory.Builder instance.
+ */
+public class TestZookeeperClientCreation {
+
+  private ZookeeperClient clientConfigurer;
+  private CuratorFrameworkFactory.Builder cfBuilder;
+
+  @BeforeEach
+  public void setup() {
+    clientConfigurer = spy(ZookeeperClient.configure());
+    clientConfigurer.withConnectionString("dummy");
+    cfBuilder = spy(CuratorFrameworkFactory.builder());
+
+    when(clientConfigurer.createFrameworkFactoryBuilder()).thenReturn(cfBuilder);
+  }
+
+  //Positive tests
+  @Test
+  public void testConnectionStringSet() {
+    clientConfigurer.withConnectionString("conn").create();
+
+    verify(cfBuilder).connectString("conn");
+
+    verifyDefaultZKFactory();
+    verifyDefaultNamespace();
+    verifyDefaultSessionTimeout();
+    verifyDefaultConnectionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultAclProvider();
+    verifyDefaultZKClientConfig();
+  }
+
+  @Test
+  public void testZookeeperFactorySet() {
+    ZookeeperFactory zkFactory = mock(ZookeeperFactory.class);
+    clientConfigurer.withZookeeperFactory(zkFactory).create();
+
+    verify(cfBuilder).zookeeperFactory(zkFactory);
+
+    verifyDummyConnectionString();
+    verifyDefaultNamespace();
+    verifyDefaultSessionTimeout();
+    verifyDefaultConnectionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultAclProvider();
+    verifyDefaultZKClientConfig();
+  }
+
+  @Test
+  public void testNameSpaceSet() {
+    clientConfigurer.withNamespace("someNS/someSubSpace").create();
+
+    verify(cfBuilder).namespace("someNS/someSubSpace");
+
+    verifyDummyConnectionString();
+    verifyDefaultZKFactory();
+    verifyDefaultSessionTimeout();
+    verifyDefaultConnectionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultAclProvider();
+    verifyDefaultZKClientConfig();
+  }
+
+  @Test
+  public void testSessionTimeoutSet() {
+    clientConfigurer.withSessionTimeout(20000).create();
+
+    verify(cfBuilder).sessionTimeoutMs(20000);
+
+    verifyDummyConnectionString();
+    verifyDefaultZKFactory();
+    verifyDefaultNamespace();
+    verifyDefaultConnectionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultAclProvider();
+    verifyDefaultZKClientConfig();
+  }
+
+
+  @Test
+  public void testDefaultSessionTimeoutIsAffectedBySystemProperty() {
+    System.setProperty("curator-default-session-timeout", "20000");
+    setup();
+    clientConfigurer.create();
+
+    verify(cfBuilder).sessionTimeoutMs(20000);
+
+    verifyDummyConnectionString();
+    verifyDefaultZKFactory();
+    verifyDefaultNamespace();
+    verifyDefaultConnectionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultAclProvider();
+    verifyDefaultZKClientConfig();
+    System.clearProperty("curator-default-session-timeout");
+  }
+
+  @Test
+  public void testConnectionTimeoutSet() {
+    clientConfigurer.withConnectionTimeout(50).create();
+
+    verify(cfBuilder).connectionTimeoutMs(50);
+
+    verifyDummyConnectionString();
+    verifyDefaultZKFactory();
+    verifyDefaultNamespace();
+    verifyDefaultSessionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultAclProvider();
+    verifyDefaultZKClientConfig();
+  }
+
+  @Test
+  public void testDefaultConnectionTimeoutIsAffectedBySystemProperty() {
+    System.setProperty("curator-default-connection-timeout", "50");
+    setup();
+    clientConfigurer.create();
+
+    verify(cfBuilder).connectionTimeoutMs(50);
+
+    verifyDummyConnectionString();
+    verifyDefaultZKFactory();
+    verifyDefaultNamespace();
+    verifyDefaultSessionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultAclProvider();
+    verifyDefaultZKClientConfig();
+    System.clearProperty("curator-default-connection-timeout");
+  }
+
+  @Test
+  public void testRetryPolicySet() {
+    RetryPolicy policy = mock(RetryPolicy.class);
+    clientConfigurer.withRetryPolicy(policy).create();
+
+    verify(cfBuilder).retryPolicy(policy);
+
+    verifyDummyConnectionString();
+    verifyDefaultZKFactory();
+    verifyDefaultNamespace();
+    verifyDefaultSessionTimeout();
+    verifyDefaultConnectionTimeout();
+    verifyDefaultAclProvider();
+    verifyDefaultZKClientConfig();
+  }
+
+  @Test
+  public void testSaslAutTypeWithIBMJava() {
+    testSaslAuthType("IBMJava");
+  }
+
+  @Test
+  public void testSaslAuthTypeWithNonIBMJava() {
+    testSaslAuthType("OracleJava");
+  }
+
+  @Test
+  public void testSSLConfiguration() {
+    clientConfigurer
+        .enableSSL(true)
+        .withKeystore("keystoreLoc")
+        .withKeystorePassword("ksPass")
+        .withTruststore("truststoreLoc")
+        .withTruststorePassword("tsPass")
+        .create();
+
+    ArgumentCaptor<ZKClientConfig> clientConfCaptor = forClass(ZKClientConfig.class);
+    verify(cfBuilder).zkClientConfig(clientConfCaptor.capture());
+    ZKClientConfig conf = clientConfCaptor.getValue();
+
+    assertThat(conf.getProperty(ZKClientConfig.SECURE_CLIENT), is("true"));
+    assertThat(conf.getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET),
+        is("org.apache.zookeeper.ClientCnxnSocketNetty"));
+    try (ClientX509Util sslOpts = new ClientX509Util()) {
+      assertThat(conf.getProperty(sslOpts.getSslKeystoreLocationProperty()), is("keystoreLoc"));
+      assertThat(conf.getProperty(sslOpts.getSslKeystorePasswdProperty()), is("ksPass"));
+      assertThat(conf.getProperty(sslOpts.getSslTruststoreLocationProperty()), is("truststoreLoc"));
+      assertThat(conf.getProperty(sslOpts.getSslTruststorePasswdProperty()), is("tsPass"));
+    }
+
+    verifyDummyConnectionString();
+    verifyDefaultZKFactory();
+    verifyDefaultNamespace();
+    verifyDefaultSessionTimeout();
+    verifyDefaultConnectionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultAclProvider();
+  }
+
+  //Negative tests
+  @Test
+  public void testNoConnectionString(){
+    clientConfigurer.withConnectionString(null);
+
+    Throwable t = assertThrows(NullPointerException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), containsString("Zookeeper connection string cannot be null!"));
+  }
+
+  @Test
+  public void testNoRetryPolicy() {
+    clientConfigurer.withRetryPolicy(null);
+
+    Throwable t = assertThrows(NullPointerException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), containsString("Zookeeper connection retry policy cannot be null!"));
+  }
+
+  @Test
+  public void testNoAuthType() {
+    clientConfigurer.withAuthType(null);
+
+    Throwable t = assertThrows(NullPointerException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), containsString("Zookeeper authType cannot be null!"));
+  }
+
+  @Test
+  public void testUnrecognizedAuthType() {
+    clientConfigurer.withAuthType("something");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), is("Zookeeper authType must be one of [none, sasl]!"));
+  }
+
+  @Test
+  public void testSaslAuthTypeWithoutKeytab() {
+    clientConfigurer.withAuthType("sasl");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), is("Zookeeper client's Kerberos Keytab must be specified!"));
+  }
+
+  @Test
+  public void testSaslAuthTypeWithEmptyKeytab() {
+    clientConfigurer
+        .withAuthType("sasl")
+        .withKeytab("");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), is("Zookeeper client's Kerberos Keytab must be specified!"));
+  }
+
+  @Test
+  public void testSaslAuthTypeWithoutPrincipal() {
+    clientConfigurer
+        .withAuthType("sasl")
+        .withKeytab("keytabLoc");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), is("Zookeeper client's Kerberos Principal must be specified!"));
+  }
+
+  @Test
+  public void testSaslAuthTypeWithEmptyPrincipal() {
+    clientConfigurer
+        .withAuthType("sasl")
+        .withKeytab("keytabLoc")
+        .withPrincipal("");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), is("Zookeeper client's Kerberos Principal must be specified!"));
+  }
+
+  @Test
+  public void testSaslAuthTypeWithoutJaasLoginEntryName() {
+    clientConfigurer
+        .withAuthType("sasl")
+        .withKeytab("keytabLoc")
+        .withPrincipal("principal")
+        .withJaasLoginEntryName(null);
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), is("JAAS Login Entry name must be specified!"));
+  }
+
+  @Test
+  public void testSaslAuthTypeWithEmptyJaasLoginEntryName() {
+    clientConfigurer
+        .withAuthType("sasl")
+        .withKeytab("keytabLoc")
+        .withPrincipal("principal")
+        .withJaasLoginEntryName("");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(), is("JAAS Login Entry name must be specified!"));
+  }
+
+  @Test
+  public void testSSLWithoutKeystore() {
+    clientConfigurer
+        .enableSSL(true);
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(),
+        is("The keystore location parameter is empty for the ZooKeeper client connection."));
+  }
+
+  @Test
+  public void testSSLWithEmptyKeystore() {
+    clientConfigurer
+        .enableSSL(true)
+        .withKeystore("");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(),
+        is("The keystore location parameter is empty for the ZooKeeper client connection."));
+  }
+
+  @Test
+  public void testSSLWithoutTruststore() {
+    clientConfigurer
+        .enableSSL(true)
+        .withKeystore("keyStoreLoc");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(),
+        is("The truststore location parameter is empty for the ZooKeeper client connection."));
+  }
+
+  @Test
+  public void testSSLWithEmptyTruststore() {
+    clientConfigurer
+        .enableSSL(true)
+        .withKeystore("keyStoreLoc")
+        .withTruststore("");
+
+    Throwable t = assertThrows(IllegalArgumentException.class, () -> clientConfigurer.create());
+    assertThat(t.getMessage(),
+        is("The truststore location parameter is empty for the ZooKeeper client connection."));
+  }
+
+  private void testSaslAuthType(String vendor) {
+    String origVendor = System.getProperty("java.vendor");
+    System.setProperty("java.vendor", vendor);
+    Configuration origConf = Configuration.getConfiguration();
+
+    try {
+      clientConfigurer
+          .withAuthType("sasl")
+          .withKeytab("keytabLoc")
+          .withPrincipal("principal@some.host/SOME.REALM")
+          .withJaasLoginEntryName("TestEntry")
+          .create();
+
+      ArgumentCaptor<SASLOwnerACLProvider> aclProviderCaptor = forClass(SASLOwnerACLProvider.class);
+      verify(cfBuilder).aclProvider(aclProviderCaptor.capture());
+      SASLOwnerACLProvider aclProvider = aclProviderCaptor.getValue();
+
+      assertThat(aclProvider.getDefaultAcl().size(), is(1));
+      assertThat(aclProvider.getDefaultAcl().get(0).getId().getScheme(), is("sasl"));
+      assertThat(aclProvider.getDefaultAcl().get(0).getId().getId(), is("principal"));
+      assertThat(aclProvider.getDefaultAcl().get(0).getPerms(), is(ZooDefs.Perms.ALL));
+
+      Arrays.stream(new String[] {"/", "/foo", "/foo/bar/baz", "/random/path"})
+          .forEach(s -> {
+            assertThat(aclProvider.getAclForPath(s).size(), is(1));
+            assertThat(aclProvider.getAclForPath(s).get(0).getId().getScheme(), is("sasl"));
+            assertThat(aclProvider.getAclForPath(s).get(0).getId().getId(), is("principal"));
+            assertThat(aclProvider.getAclForPath(s).get(0).getPerms(), is(ZooDefs.Perms.ALL));
+          });
+
+      assertThat(System.getProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY), is("TestEntry"));
+      assertThat(System.getProperty("zookeeper.authProvider.1"),
+          is("org.apache.zookeeper.server.auth.SASLAuthenticationProvider"));
+
+      Configuration config = Configuration.getConfiguration();
+      assertThat(config.getAppConfigurationEntry("TestEntry").length, is(1));
+      AppConfigurationEntry entry = config.getAppConfigurationEntry("TestEntry")[0];
+      assertThat(entry.getOptions().get("keyTab"), is("keytabLoc"));
+      assertThat(entry.getOptions().get("principal"), is("principal@some.host/SOME.REALM"));
+      assertThat(entry.getOptions().get("useKeyTab"), is("true"));
+      assertThat(entry.getOptions().get("storeKey"), is("true"));
+      assertThat(entry.getOptions().get("useTicketCache"), is("false"));
+      assertThat(entry.getOptions().get("refreshKrb5Config"), is("true"));
+      if (System.getProperty("java.vendor").contains("IBM")){
+        assertThat(entry.getLoginModuleName(), is("com.ibm.security.auth.module.Krb5LoginModule"));
+      } else {
+        assertThat(entry.getLoginModuleName(), is("com.sun.security.auth.module.Krb5LoginModule"));
+      }
+    } finally {
+      Configuration.setConfiguration(origConf);
+      System.setProperty("java.vendor", origVendor);
+    }
+
+    verifyDummyConnectionString();
+    verifyDefaultZKFactory();
+    verifyDefaultNamespace();
+    verifyDefaultSessionTimeout();
+    verifyDefaultConnectionTimeout();
+    verifyDefaultRetryPolicy();
+    verifyDefaultZKClientConfig();
+  }
+
+  private void verifyDummyConnectionString() {
+    verify(cfBuilder).connectString("dummy");
+  }
+
+  private void verifyDefaultNamespace() {
+    verify(cfBuilder).namespace(null);
+  }
+
+  private void verifyDefaultZKFactory() {
+    verify(cfBuilder).zookeeperFactory(isA(ConfigurableZookeeperFactory.class));
+  }
+
+  private void verifyDefaultSessionTimeout() {
+    verify(cfBuilder).sessionTimeoutMs(60000);
+  }
+
+  private void verifyDefaultConnectionTimeout() {
+    verify(cfBuilder).connectionTimeoutMs(15000);
+  }
+
+  private void verifyDefaultRetryPolicy() {
+    ArgumentCaptor<ExponentialBackoffRetry> retry = forClass(ExponentialBackoffRetry.class);
+    verify(cfBuilder).retryPolicy(retry.capture());
+    ExponentialBackoffRetry policy = retry.getValue();
+
+    assertThat(policy.getBaseSleepTimeMs(), is(1000));
+    assertThat(policy.getN(), is(3));
+  }
+
+  private void verifyDefaultAclProvider() {
+    verify(cfBuilder).aclProvider(isA(DefaultACLProvider.class));
+  }
+
+  private void verifyDefaultZKClientConfig() {
+    ArgumentCaptor<ZKClientConfig> clientConfCaptor = forClass(ZKClientConfig.class);
+    verify(cfBuilder).zkClientConfig(clientConfCaptor.capture());
+    ZKClientConfig conf = clientConfCaptor.getValue();
+
+    assertThat(conf.getProperty(ZKClientConfig.SECURE_CLIENT), isEmptyOrFalse());
+    try (ClientX509Util sslOpts = new ClientX509Util()) {
+      assertThat(conf.getProperty(sslOpts.getSslKeystoreLocationProperty()), isEmpty());
+      assertThat(conf.getProperty(sslOpts.getSslKeystorePasswdProperty()), isEmpty());
+      assertThat(conf.getProperty(sslOpts.getSslTruststoreLocationProperty()), isEmpty());
+      assertThat(conf.getProperty(sslOpts.getSslTruststorePasswdProperty()), isEmpty());
+    }
+  }
+
+  private Matcher<String> isEmptyOrFalse() {
+    return anyOf(isEmpty(), is("false"));
+  }
+
+  private Matcher<String> isEmpty() {
+    return anyOf(new IsNull<>(), is(""));
+  }
+
+}

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -417,6 +417,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   /** How often to retry a ZooKeeper operation  in milliseconds. */
   public static final String ZK_RETRY_INTERVAL_MS =
       ZK_PREFIX + "retry-interval-ms";
+
+  /** SSL enablement for all Hadoop-&gt;ZK communication. */
+  //Note: except ZKSignerSecretProvider in hadoop-auth to avoid circular dependency.
+  public static final String ZK_CLIENT_SSL_ENABLED = ZK_PREFIX + "ssl.enabled";
   /** Keystore location for ZooKeeper client connection over SSL. */
   public static final String ZK_SSL_KEYSTORE_LOCATION = ZK_PREFIX + "ssl.keystore.location";
   /** Keystore password for ZooKeeper client connection over SSL. */
@@ -425,6 +429,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String ZK_SSL_TRUSTSTORE_LOCATION = ZK_PREFIX + "ssl.truststore.location";
   /** Truststore password for ZooKeeper client connection over SSL.  */
   public static final String ZK_SSL_TRUSTSTORE_PASSWORD = ZK_PREFIX + "ssl.truststore.password";
+
   public static final int    ZK_RETRY_INTERVAL_MS_DEFAULT = 1000;
   /** Default domain name resolver for hadoop to use. */
   public static final String HADOOP_DOMAINNAME_RESOLVER_IMPL =

+ 79 - 118
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java

@@ -24,18 +24,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Stream;
 
-import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
-import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.imps.DefaultACLProvider;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.CuratorCache;
 import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
@@ -43,28 +37,28 @@ import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.authentication.util.JaasConfiguration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.SecurityUtil.TruststoreKeystore;
+import org.apache.hadoop.security.authentication.util.ZookeeperClient;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+
+import static org.apache.hadoop.security.SecurityUtil.getServerPrincipal;
 import static org.apache.hadoop.util.Time.now;
-import org.apache.hadoop.util.curator.ZKCuratorManager;
+
+import org.apache.hadoop.util.curator.ZKCuratorManager.HadoopZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.client.ZKClientConfig;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.util.Preconditions;
 
 /**
  * An implementation of {@link AbstractDelegationTokenSecretManager} that
@@ -104,6 +98,16 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       + "token.watcher.enabled";
   public static final boolean ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT = true;
 
+  public static final String ZK_DTSM_ZK_SSL_ENABLED = ZK_CONF_PREFIX + "ssl.enabled";
+  public static final String ZK_DTSM_ZK_SSL_KEYSTORE_LOCATION =
+      ZK_CONF_PREFIX + "ssl.keystore.location";
+  public static final String ZK_DTSM_ZK_SSL_KEYSTORE_PASSWORD =
+      ZK_CONF_PREFIX + "ssl.keystore.password";
+  public static final String ZK_DTSM_ZK_SSL_TRUSTSTORE_LOCATION =
+      ZK_CONF_PREFIX + "ssl.truststore.location";
+  public static final String ZK_DTSM_ZK_SSL_TRUSTSTORE_PASSWORD =
+      ZK_CONF_PREFIX + "ssl.truststore.password";
+
   public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
   public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
   public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
@@ -166,93 +170,74 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
         ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
     this.currentSeqNumLock = new ReentrantLock(true);
+
+    String workPath = conf.get(ZK_DTSM_ZNODE_WORKING_PATH, ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT);
+    String nameSpace = workPath + "/" + ZK_DTSM_NAMESPACE;
     if (CURATOR_TL.get() != null) {
-      zkClient =
-          CURATOR_TL.get().usingNamespace(
-              conf.get(ZK_DTSM_ZNODE_WORKING_PATH,
-                  ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT)
-                  + "/" + ZK_DTSM_NAMESPACE);
+      zkClient = CURATOR_TL.get().usingNamespace(nameSpace);
       isExternalClient = true;
     } else {
-      String connString = conf.get(ZK_DTSM_ZK_CONNECTION_STRING);
-      Preconditions.checkNotNull(connString,
-          "Zookeeper connection string cannot be null");
-      String authType = conf.get(ZK_DTSM_ZK_AUTH_TYPE);
-
-      // AuthType has to be explicitly set to 'none' or 'sasl'
-      Preconditions.checkNotNull(authType, "Zookeeper authType cannot be null !!");
-      Preconditions.checkArgument(
-          authType.equals("sasl") || authType.equals("none"),
-          "Zookeeper authType must be one of [none, sasl]");
-
-      Builder builder = null;
-      try {
-        ACLProvider aclProvider = null;
-        if (authType.equals("sasl")) {
-          LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
-              + "and using 'sasl' ACLs");
-          String principal = setJaasConfiguration(conf);
-          System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
-                             JAAS_LOGIN_ENTRY_NAME);
-          System.setProperty("zookeeper.authProvider.1",
-              "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
-          aclProvider = new SASLOwnerACLProvider(principal);
-        } else { // "none"
-          LOG.info("Connecting to ZooKeeper without authentication");
-          aclProvider = new DefaultACLProvider(); // open to everyone
-        }
-        int sessionT =
-            conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT,
-                ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT);
-        int numRetries =
-            conf.getInt(ZK_DTSM_ZK_NUM_RETRIES, ZK_DTSM_ZK_NUM_RETRIES_DEFAULT);
-        builder =
-            CuratorFrameworkFactory
-                .builder()
-                .zookeeperFactory(new ZKCuratorManager.HadoopZookeeperFactory(
-                    conf.get(ZK_DTSM_ZK_KERBEROS_SERVER_PRINCIPAL)))
-                .aclProvider(aclProvider)
-                .namespace(
-                    conf.get(ZK_DTSM_ZNODE_WORKING_PATH,
-                        ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT)
-                        + "/"
-                        + ZK_DTSM_NAMESPACE
-                )
-                .sessionTimeoutMs(sessionT)
-                .connectionTimeoutMs(
-                    conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT,
-                        ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT)
-                )
-                .retryPolicy(
-                    new RetryNTimes(numRetries, numRetries == 0 ? 0 : sessionT / numRetries));
-      } catch (Exception ex) {
-        throw new RuntimeException("Could not Load ZK acls or auth: " + ex, ex);
-      }
-      zkClient = builder.ensembleProvider(new FixedEnsembleProvider(connString))
-          .build();
+      zkClient = createCuratorClient(conf, nameSpace);
       isExternalClient = false;
     }
   }
 
-  private String setJaasConfiguration(Configuration config) throws Exception {
-    String keytabFile =
-        config.get(ZK_DTSM_ZK_KERBEROS_KEYTAB, "").trim();
-    if (keytabFile == null || keytabFile.length() == 0) {
-      throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_KEYTAB
-          + " must be specified");
-    }
-    String principal =
-        config.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL, "").trim();
-    principal = SecurityUtil.getServerPrincipal(principal, "");
-    if (principal == null || principal.length() == 0) {
-      throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_PRINCIPAL
-          + " must be specified");
+  @VisibleForTesting
+  static CuratorFramework createCuratorClient(Configuration conf, String namespace) {
+    try {
+      String connString = conf.get(ZK_DTSM_ZK_CONNECTION_STRING);
+      String authType = conf.get(ZK_DTSM_ZK_AUTH_TYPE);
+      String keytab = conf.get(ZK_DTSM_ZK_KERBEROS_KEYTAB, "").trim();
+      String principal = getServerPrincipal(conf.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL, "").trim(), "");
+      int sessionTimeout =
+          conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT, ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT);
+      int connectionTimeout =
+          conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT, ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT);
+      int retryCount =
+          conf.getInt(ZK_DTSM_ZK_NUM_RETRIES, ZK_DTSM_ZK_NUM_RETRIES_DEFAULT);
+      RetryPolicy retryPolicy =
+          new RetryNTimes(retryCount, retryCount == 0 ? 0 : sessionTimeout / retryCount);
+
+      boolean isSSLEnabled =
+          conf.getBoolean(CommonConfigurationKeys.ZK_CLIENT_SSL_ENABLED,
+              conf.getBoolean(ZK_DTSM_ZK_SSL_ENABLED, false));
+      String keystoreLocation = conf.get(ZK_DTSM_ZK_SSL_KEYSTORE_LOCATION,
+          conf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, ""));
+      String keystorePassword = conf.get(ZK_DTSM_ZK_SSL_KEYSTORE_PASSWORD,
+          conf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, ""));
+      String truststoreLocation = conf.get(ZK_DTSM_ZK_SSL_TRUSTSTORE_LOCATION,
+          conf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, ""));
+      String truststorePassword = conf.get(ZK_DTSM_ZK_SSL_TRUSTSTORE_PASSWORD,
+          conf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, ""));
+
+      ZookeeperFactory zkFactory = new HadoopZookeeperFactory(
+          conf.get(ZK_DTSM_ZK_KERBEROS_SERVER_PRINCIPAL),
+          conf.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL),
+          conf.get(ZK_DTSM_ZK_KERBEROS_KEYTAB),
+          isSSLEnabled,
+          new TruststoreKeystore(conf));
+
+
+      return ZookeeperClient.configure()
+          .withConnectionString(connString)
+          .withNamespace(namespace)
+          .withZookeeperFactory(zkFactory)
+          .withAuthType(authType)
+          .withKeytab(keytab)
+          .withPrincipal(principal)
+          .withJaasLoginEntryName(JAAS_LOGIN_ENTRY_NAME)
+          .withRetryPolicy(retryPolicy)
+          .withSessionTimeout(sessionTimeout)
+          .withConnectionTimeout(connectionTimeout)
+          .enableSSL(isSSLEnabled)
+          .withKeystore(keystoreLocation)
+          .withKeystorePassword(keystorePassword)
+          .withTruststore(truststoreLocation)
+          .withTruststorePassword(truststorePassword)
+          .create();
+    } catch (Exception ex) {
+      throw new RuntimeException("Could not Load ZK acls or auth: " + ex, ex);
     }
-
-    JaasConfiguration jConf =
-        new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile);
-    javax.security.auth.login.Configuration.setConfiguration(jConf);
-    return principal.split("[/@]")[0];
   }
 
   @Override
@@ -888,30 +873,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     return isTokenWatcherEnabled;
   }
 
-  /**
-   * Simple implementation of an {@link ACLProvider} that simply returns an ACL
-   * that gives all permissions only to a single principal.
-   */
-  private static class SASLOwnerACLProvider implements ACLProvider {
-
-    private final List<ACL> saslACL;
-
-    private SASLOwnerACLProvider(String principal) {
-      this.saslACL = Collections.singletonList(
-          new ACL(Perms.ALL, new Id("sasl", principal)));
-    }
-
-    @Override
-    public List<ACL> getDefaultAcl() {
-      return saslACL;
-    }
-
-    @Override
-    public List<ACL> getAclForPath(String path) {
-      return saslACL;
-    }
-  }
-
   @VisibleForTesting
   @Private
   @Unstable

+ 12 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -4255,6 +4255,18 @@ The switch to turn S3A auditing on or off.
     </description>
   </property>
 
+  <property>
+    <name>hadoop.zk.ssl.enabled</name>
+    <decription>
+      Enable SSL/TLS encryption for the ZooKeeper communication.
+      Note: this setting overrides dfs.ha.zkfc.client.ssl.enabled,
+      yarn.resourcemanager.zk-client-ssl.enabled and also
+      hadoop.kms.authentication.zk-dt-secret-manager.ssl.enabled in order to unify the SSL based
+      Zookeeper access across Hadoop. Leaving this property empty ensures that service specific
+      enablement can be done separately.
+    </decription>
+  </property>
+
   <property>
     <name>hadoop.zk.ssl.keystore.location</name>
     <description>

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java

@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -297,8 +298,8 @@ public class DFSZKFailoverController extends ZKFailoverController {
 
   @Override
   protected boolean isSSLEnabled() {
-    return conf.getBoolean(
-        DFSConfigKeys.ZK_CLIENT_SSL_ENABLED,
-        DFSConfigKeys.DEFAULT_ZK_CLIENT_SSL_ENABLED);
+    return conf.getBoolean(CommonConfigurationKeys.ZK_CLIENT_SSL_ENABLED,
+        conf.getBoolean(DFSConfigKeys.ZK_CLIENT_SSL_ENABLED,
+            DFSConfigKeys.DEFAULT_ZK_CLIENT_SSL_ENABLED));
   }
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -3802,6 +3802,8 @@
   <value>false</value>
   <description>
     Enable SSL/TLS encryption for the ZooKeeper communication from ZKFC.
+    Note: if hadoop.zk.ssl.enabled is set to a value, then that central setting has precedence,
+    and this value will be overridden by the value of hadoop.zk.ssl.enabled.
   </description>
 </property>
 

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -742,7 +742,11 @@
   </property>
 
   <property>
-    <description>Enable SSL/TLS encryption for the ZooKeeper communication.</description>
+    <description>
+      Enable SSL/TLS encryption for the ZooKeeper communication.
+      Note: if hadoop.zk.ssl.enabled is set to a value, then that central setting has precedence,
+      and this value will be overridden by the value of hadoop.zk.ssl.enabled.
+    </description>
     <name>yarn.resourcemanager.zk-client-ssl.enabled</name>
     <value>false</value>
   </property>

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java

@@ -105,8 +105,10 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
         conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
           .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
             CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
-    boolean isSSLEnabled = conf.getBoolean(YarnConfiguration.RM_ZK_CLIENT_SSL_ENABLED,
-            YarnConfiguration.DEFAULT_RM_ZK_CLIENT_SSL_ENABLED);
+    boolean isSSLEnabled =
+        conf.getBoolean(CommonConfigurationKeys.ZK_CLIENT_SSL_ENABLED,
+            conf.getBoolean(YarnConfiguration.RM_ZK_CLIENT_SSL_ENABLED,
+                YarnConfiguration.DEFAULT_RM_ZK_CLIENT_SSL_ENABLED));
     SecurityUtil.TruststoreKeystore truststoreKeystore
             = isSSLEnabled ? new SecurityUtil.TruststoreKeystore(conf) : null;
     elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.classification.VisibleForTesting;
 import com.sun.jersey.spi.container.servlet.ServletContainer;
@@ -427,8 +428,11 @@ public class ResourceManager extends CompositeService
       authInfos.add(authInfo);
     }
 
-    manager.start(authInfos, config.getBoolean(YarnConfiguration.RM_ZK_CLIENT_SSL_ENABLED,
-        YarnConfiguration.DEFAULT_RM_ZK_CLIENT_SSL_ENABLED));
+    boolean isSSLEnabled =
+        config.getBoolean(CommonConfigurationKeys.ZK_CLIENT_SSL_ENABLED,
+            config.getBoolean(YarnConfiguration.RM_ZK_CLIENT_SSL_ENABLED,
+                YarnConfiguration.DEFAULT_RM_ZK_CLIENT_SSL_ENABLED));
+    manager.start(authInfos, isSSLEnabled);
     return manager;
   }