|
@@ -28,12 +28,16 @@ import org.apache.curator.framework.CuratorFramework;
|
|
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
|
|
import org.apache.curator.framework.api.transaction.CuratorOp;
|
|
|
import org.apache.curator.retry.RetryNTimes;
|
|
|
+import org.apache.curator.utils.ZookeeperFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.util.ZKUtil;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
+import org.apache.zookeeper.Watcher;
|
|
|
+import org.apache.zookeeper.ZooKeeper;
|
|
|
+import org.apache.zookeeper.client.ZKClientConfig;
|
|
|
import org.apache.zookeeper.data.ACL;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
import org.slf4j.Logger;
|
|
@@ -148,6 +152,8 @@ public final class ZKCuratorManager {
|
|
|
|
|
|
CuratorFramework client = CuratorFrameworkFactory.builder()
|
|
|
.connectString(zkHostPort)
|
|
|
+ .zookeeperFactory(new HadoopZookeeperFactory(
|
|
|
+ conf.get(CommonConfigurationKeys.ZK_SERVER_PRINCIPAL)))
|
|
|
.sessionTimeoutMs(zkSessionTimeout)
|
|
|
.retryPolicy(retryPolicy)
|
|
|
.authorization(authInfos)
|
|
@@ -428,4 +434,27 @@ public final class ZKCuratorManager {
|
|
|
.forPath(path, data));
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public static class HadoopZookeeperFactory implements ZookeeperFactory {
|
|
|
+ private final String zkPrincipal;
|
|
|
+
|
|
|
+ public HadoopZookeeperFactory(String zkPrincipal) {
|
|
|
+ this.zkPrincipal = zkPrincipal;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ZooKeeper newZooKeeper(String connectString, int sessionTimeout,
|
|
|
+ Watcher watcher, boolean canBeReadOnly
|
|
|
+ ) throws Exception {
|
|
|
+ ZKClientConfig zkClientConfig = new ZKClientConfig();
|
|
|
+ if (zkPrincipal != null) {
|
|
|
+ LOG.info("Configuring zookeeper to use {} as the server principal",
|
|
|
+ zkPrincipal);
|
|
|
+ zkClientConfig.setProperty(ZKClientConfig.ZK_SASL_CLIENT_USERNAME,
|
|
|
+ zkPrincipal);
|
|
|
+ }
|
|
|
+ return new ZooKeeper(connectString, sessionTimeout, watcher,
|
|
|
+ canBeReadOnly, zkClientConfig);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|