|
@@ -29,8 +29,10 @@ import java.util.concurrent.locks.ReentrantLock;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
|
|
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
+import org.apache.zookeeper.client.ZKClientConfig;
|
|
import org.apache.zookeeper.data.ACL;
|
|
import org.apache.zookeeper.data.ACL;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.Watcher;
|
|
import org.apache.zookeeper.Watcher;
|
|
@@ -48,6 +50,10 @@ import org.apache.hadoop.util.Preconditions;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
+import javax.naming.ConfigurationException;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.security.SecurityUtil.TruststoreKeystore;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
*
|
|
*
|
|
* This class implements a simple library to perform leader election on top of
|
|
* This class implements a simple library to perform leader election on top of
|
|
@@ -170,6 +176,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
private final int zkSessionTimeout;
|
|
private final int zkSessionTimeout;
|
|
private final List<ACL> zkAcl;
|
|
private final List<ACL> zkAcl;
|
|
private final List<ZKAuthInfo> zkAuthInfo;
|
|
private final List<ZKAuthInfo> zkAuthInfo;
|
|
|
|
+ private TruststoreKeystore truststoreKeystore;
|
|
private byte[] appData;
|
|
private byte[] appData;
|
|
private final String zkLockFilePath;
|
|
private final String zkLockFilePath;
|
|
private final String zkBreadCrumbPath;
|
|
private final String zkBreadCrumbPath;
|
|
@@ -209,6 +216,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
* @param app
|
|
* @param app
|
|
* reference to callback interface object
|
|
* reference to callback interface object
|
|
* @param maxRetryNum maxRetryNum.
|
|
* @param maxRetryNum maxRetryNum.
|
|
|
|
+ * @param truststoreKeystore truststore keystore, that we will use for ZK if SSL/TLS is enabled
|
|
* @throws IOException raised on errors performing I/O.
|
|
* @throws IOException raised on errors performing I/O.
|
|
* @throws HadoopIllegalArgumentException
|
|
* @throws HadoopIllegalArgumentException
|
|
* if valid data is not supplied.
|
|
* if valid data is not supplied.
|
|
@@ -218,10 +226,10 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
public ActiveStandbyElector(String zookeeperHostPorts,
|
|
public ActiveStandbyElector(String zookeeperHostPorts,
|
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
|
List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
|
|
List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
|
|
- int maxRetryNum) throws IOException, HadoopIllegalArgumentException,
|
|
|
|
- KeeperException {
|
|
|
|
|
|
+ int maxRetryNum, TruststoreKeystore truststoreKeystore)
|
|
|
|
+ throws IOException, HadoopIllegalArgumentException, KeeperException {
|
|
this(zookeeperHostPorts, zookeeperSessionTimeout, parentZnodeName, acl,
|
|
this(zookeeperHostPorts, zookeeperSessionTimeout, parentZnodeName, acl,
|
|
- authInfo, app, maxRetryNum, true);
|
|
|
|
|
|
+ authInfo, app, maxRetryNum, true, truststoreKeystore);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -254,6 +262,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
* @param failFast
|
|
* @param failFast
|
|
* whether need to add the retry when establishing ZK connection.
|
|
* whether need to add the retry when establishing ZK connection.
|
|
* @param maxRetryNum max Retry Num
|
|
* @param maxRetryNum max Retry Num
|
|
|
|
+ * @param truststoreKeystore truststore keystore, that we will use for ZK if SSL/TLS is enabled
|
|
* @throws IOException
|
|
* @throws IOException
|
|
* raised on errors performing I/O.
|
|
* raised on errors performing I/O.
|
|
* @throws HadoopIllegalArgumentException
|
|
* @throws HadoopIllegalArgumentException
|
|
@@ -264,7 +273,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
public ActiveStandbyElector(String zookeeperHostPorts,
|
|
public ActiveStandbyElector(String zookeeperHostPorts,
|
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
|
List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
|
|
List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
|
|
- int maxRetryNum, boolean failFast) throws IOException,
|
|
|
|
|
|
+ int maxRetryNum, boolean failFast, TruststoreKeystore truststoreKeystore) throws IOException,
|
|
HadoopIllegalArgumentException, KeeperException {
|
|
HadoopIllegalArgumentException, KeeperException {
|
|
if (app == null || acl == null || parentZnodeName == null
|
|
if (app == null || acl == null || parentZnodeName == null
|
|
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
|
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
|
@@ -279,6 +288,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
|
|
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
|
|
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
|
|
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
|
|
this.maxRetryNum = maxRetryNum;
|
|
this.maxRetryNum = maxRetryNum;
|
|
|
|
+ this.truststoreKeystore = truststoreKeystore;
|
|
|
|
|
|
// establish the ZK Connection for future API calls
|
|
// establish the ZK Connection for future API calls
|
|
if (failFast) {
|
|
if (failFast) {
|
|
@@ -740,7 +750,19 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
|
* @throws IOException raised on errors performing I/O.
|
|
* @throws IOException raised on errors performing I/O.
|
|
*/
|
|
*/
|
|
protected ZooKeeper createZooKeeper() throws IOException {
|
|
protected ZooKeeper createZooKeeper() throws IOException {
|
|
- return new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
|
|
|
|
|
|
+ ZKClientConfig zkClientConfig = new ZKClientConfig();
|
|
|
|
+ if (truststoreKeystore != null) {
|
|
|
|
+ try {
|
|
|
|
+ SecurityUtil.setSslConfiguration(zkClientConfig, truststoreKeystore);
|
|
|
|
+ } catch (ConfigurationException ce) {
|
|
|
|
+ throw new IOException(ce);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return initiateZookeeper(zkClientConfig);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected ZooKeeper initiateZookeeper(ZKClientConfig zkClientConfig) throws IOException {
|
|
|
|
+ return new ZooKeeper(zkHostPort, zkSessionTimeout, watcher, zkClientConfig);
|
|
}
|
|
}
|
|
|
|
|
|
private void fatalError(String errorMessage) {
|
|
private void fatalError(String errorMessage) {
|