|
@@ -46,6 +46,8 @@ import org.apache.zookeeper.client.ConnectStringParser;
|
|
import org.apache.zookeeper.client.HostProvider;
|
|
import org.apache.zookeeper.client.HostProvider;
|
|
import org.apache.zookeeper.client.StaticHostProvider;
|
|
import org.apache.zookeeper.client.StaticHostProvider;
|
|
import org.apache.zookeeper.client.ZKClientConfig;
|
|
import org.apache.zookeeper.client.ZKClientConfig;
|
|
|
|
+import org.apache.zookeeper.client.ZooKeeperBuilder;
|
|
|
|
+import org.apache.zookeeper.client.ZooKeeperOptions;
|
|
import org.apache.zookeeper.client.ZooKeeperSaslClient;
|
|
import org.apache.zookeeper.client.ZooKeeperSaslClient;
|
|
import org.apache.zookeeper.common.PathUtils;
|
|
import org.apache.zookeeper.common.PathUtils;
|
|
import org.apache.zookeeper.data.ACL;
|
|
import org.apache.zookeeper.data.ACL;
|
|
@@ -445,7 +447,9 @@ public class ZooKeeper implements AutoCloseable {
|
|
* if an invalid chroot path is specified
|
|
* if an invalid chroot path is specified
|
|
*/
|
|
*/
|
|
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
|
|
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
|
|
- this(connectString, sessionTimeout, watcher, false);
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -498,7 +502,10 @@ public class ZooKeeper implements AutoCloseable {
|
|
int sessionTimeout,
|
|
int sessionTimeout,
|
|
Watcher watcher,
|
|
Watcher watcher,
|
|
ZKClientConfig conf) throws IOException {
|
|
ZKClientConfig conf) throws IOException {
|
|
- this(connectString, sessionTimeout, watcher, false, conf);
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withClientConfig(conf)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -564,7 +571,11 @@ public class ZooKeeper implements AutoCloseable {
|
|
Watcher watcher,
|
|
Watcher watcher,
|
|
boolean canBeReadOnly,
|
|
boolean canBeReadOnly,
|
|
HostProvider aHostProvider) throws IOException {
|
|
HostProvider aHostProvider) throws IOException {
|
|
- this(connectString, sessionTimeout, watcher, canBeReadOnly, aHostProvider, null);
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withCanBeReadOnly(canBeReadOnly)
|
|
|
|
+ .withHostProvider(ignored -> aHostProvider)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -634,25 +645,12 @@ public class ZooKeeper implements AutoCloseable {
|
|
HostProvider hostProvider,
|
|
HostProvider hostProvider,
|
|
ZKClientConfig clientConfig
|
|
ZKClientConfig clientConfig
|
|
) throws IOException {
|
|
) throws IOException {
|
|
- LOG.info(
|
|
|
|
- "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
|
|
|
|
- connectString,
|
|
|
|
- sessionTimeout,
|
|
|
|
- watcher);
|
|
|
|
-
|
|
|
|
- this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
|
|
|
|
- this.hostProvider = hostProvider;
|
|
|
|
- ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
|
|
|
|
-
|
|
|
|
- cnxn = createConnection(
|
|
|
|
- connectStringParser.getChrootPath(),
|
|
|
|
- hostProvider,
|
|
|
|
- sessionTimeout,
|
|
|
|
- this.clientConfig,
|
|
|
|
- watcher,
|
|
|
|
- getClientCnxnSocket(),
|
|
|
|
- canBeReadOnly);
|
|
|
|
- cnxn.start();
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withCanBeReadOnly(canBeReadOnly)
|
|
|
|
+ .withHostProvider(ignored -> hostProvider)
|
|
|
|
+ .withClientConfig(clientConfig)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
ClientCnxn createConnection(
|
|
ClientCnxn createConnection(
|
|
@@ -662,6 +660,8 @@ public class ZooKeeper implements AutoCloseable {
|
|
ZKClientConfig clientConfig,
|
|
ZKClientConfig clientConfig,
|
|
Watcher defaultWatcher,
|
|
Watcher defaultWatcher,
|
|
ClientCnxnSocket clientCnxnSocket,
|
|
ClientCnxnSocket clientCnxnSocket,
|
|
|
|
+ long sessionId,
|
|
|
|
+ byte[] sessionPasswd,
|
|
boolean canBeReadOnly
|
|
boolean canBeReadOnly
|
|
) throws IOException {
|
|
) throws IOException {
|
|
return new ClientCnxn(
|
|
return new ClientCnxn(
|
|
@@ -671,6 +671,8 @@ public class ZooKeeper implements AutoCloseable {
|
|
clientConfig,
|
|
clientConfig,
|
|
defaultWatcher,
|
|
defaultWatcher,
|
|
clientCnxnSocket,
|
|
clientCnxnSocket,
|
|
|
|
+ sessionId,
|
|
|
|
+ sessionPasswd,
|
|
canBeReadOnly);
|
|
canBeReadOnly);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -731,7 +733,10 @@ public class ZooKeeper implements AutoCloseable {
|
|
int sessionTimeout,
|
|
int sessionTimeout,
|
|
Watcher watcher,
|
|
Watcher watcher,
|
|
boolean canBeReadOnly) throws IOException {
|
|
boolean canBeReadOnly) throws IOException {
|
|
- this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString));
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withCanBeReadOnly(canBeReadOnly)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -794,13 +799,11 @@ public class ZooKeeper implements AutoCloseable {
|
|
Watcher watcher,
|
|
Watcher watcher,
|
|
boolean canBeReadOnly,
|
|
boolean canBeReadOnly,
|
|
ZKClientConfig conf) throws IOException {
|
|
ZKClientConfig conf) throws IOException {
|
|
- this(
|
|
|
|
- connectString,
|
|
|
|
- sessionTimeout,
|
|
|
|
- watcher,
|
|
|
|
- canBeReadOnly,
|
|
|
|
- createDefaultHostProvider(connectString),
|
|
|
|
- conf);
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withCanBeReadOnly(canBeReadOnly)
|
|
|
|
+ .withClientConfig(conf)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -861,7 +864,10 @@ public class ZooKeeper implements AutoCloseable {
|
|
Watcher watcher,
|
|
Watcher watcher,
|
|
long sessionId,
|
|
long sessionId,
|
|
byte[] sessionPasswd) throws IOException {
|
|
byte[] sessionPasswd) throws IOException {
|
|
- this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withSession(sessionId, sessionPasswd)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -936,15 +942,12 @@ public class ZooKeeper implements AutoCloseable {
|
|
byte[] sessionPasswd,
|
|
byte[] sessionPasswd,
|
|
boolean canBeReadOnly,
|
|
boolean canBeReadOnly,
|
|
HostProvider aHostProvider) throws IOException {
|
|
HostProvider aHostProvider) throws IOException {
|
|
- this(
|
|
|
|
- connectString,
|
|
|
|
- sessionTimeout,
|
|
|
|
- watcher,
|
|
|
|
- sessionId,
|
|
|
|
- sessionPasswd,
|
|
|
|
- canBeReadOnly,
|
|
|
|
- aHostProvider,
|
|
|
|
- null);
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withSession(sessionId, sessionPasswd)
|
|
|
|
+ .withCanBeReadOnly(canBeReadOnly)
|
|
|
|
+ .withHostProvider(ignored -> aHostProvider)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1025,20 +1028,72 @@ public class ZooKeeper implements AutoCloseable {
|
|
boolean canBeReadOnly,
|
|
boolean canBeReadOnly,
|
|
HostProvider hostProvider,
|
|
HostProvider hostProvider,
|
|
ZKClientConfig clientConfig) throws IOException {
|
|
ZKClientConfig clientConfig) throws IOException {
|
|
- LOG.info(
|
|
|
|
- "Initiating client connection, connectString={} "
|
|
|
|
- + "sessionTimeout={} watcher={} sessionId=0x{} sessionPasswd={}",
|
|
|
|
- connectString,
|
|
|
|
- sessionTimeout,
|
|
|
|
- watcher,
|
|
|
|
- Long.toHexString(sessionId),
|
|
|
|
- (sessionPasswd == null ? "<null>" : "<hidden>"));
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withSession(sessionId, sessionPasswd)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withCanBeReadOnly(canBeReadOnly)
|
|
|
|
+ .withHostProvider(ignored -> hostProvider)
|
|
|
|
+ .withClientConfig(clientConfig)
|
|
|
|
+ .toOptions());
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Create a ZooKeeper client and establish session asynchronously.
|
|
|
|
+ *
|
|
|
|
+ * <p>This constructor will initiate connection to the server and return
|
|
|
|
+ * immediately - potentially (usually) before the session is fully established.
|
|
|
|
+ * The watcher from options will be notified of any changes in state. This
|
|
|
|
+ * notification can come at any point before or after the constructor call
|
|
|
|
+ * has returned.
|
|
|
|
+ *
|
|
|
|
+ * <p>The instantiated ZooKeeper client object will pick an arbitrary server
|
|
|
|
+ * from the connect string and attempt to connect to it. If establishment of
|
|
|
|
+ * the connection fails, another server in the connect string will be tried
|
|
|
|
+ * (the order is non-deterministic, as we random shuffle the list), until a
|
|
|
|
+ * connection is established. The client will continue attempts until the
|
|
|
|
+ * session is explicitly closed (or the session is expired by the server).
|
|
|
|
+ *
|
|
|
|
+ * @param options options for ZooKeeper client
|
|
|
|
+ * @throws IOException in cases of IO failure
|
|
|
|
+ */
|
|
|
|
+ @InterfaceAudience.Private
|
|
|
|
+ public ZooKeeper(ZooKeeperOptions options) throws IOException {
|
|
|
|
+ String connectString = options.getConnectString();
|
|
|
|
+ int sessionTimeout = options.getSessionTimeout();
|
|
|
|
+ long sessionId = options.getSessionId();
|
|
|
|
+ byte[] sessionPasswd = sessionId == 0 ? new byte[16] : options.getSessionPasswd();
|
|
|
|
+ Watcher watcher = options.getDefaultWatcher();
|
|
|
|
+ boolean canBeReadOnly = options.isCanBeReadOnly();
|
|
|
|
+
|
|
|
|
+ if (sessionId == 0) {
|
|
|
|
+ LOG.info(
|
|
|
|
+ "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
|
|
|
|
+ connectString,
|
|
|
|
+ sessionTimeout,
|
|
|
|
+ watcher);
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info(
|
|
|
|
+ "Initiating client connection, connectString={} "
|
|
|
|
+ + "sessionTimeout={} watcher={} sessionId=0x{} sessionPasswd={}",
|
|
|
|
+ connectString,
|
|
|
|
+ sessionTimeout,
|
|
|
|
+ watcher,
|
|
|
|
+ Long.toHexString(sessionId),
|
|
|
|
+ (sessionPasswd == null ? "<null>" : "<hidden>"));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ZKClientConfig clientConfig = options.getClientConfig();
|
|
this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
|
|
this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
|
|
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
|
|
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
|
|
|
|
+ HostProvider hostProvider;
|
|
|
|
+ if (options.getHostProvider() != null) {
|
|
|
|
+ hostProvider = options.getHostProvider().apply(connectStringParser.getServerAddresses());
|
|
|
|
+ } else {
|
|
|
|
+ hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
|
|
|
|
+ }
|
|
this.hostProvider = hostProvider;
|
|
this.hostProvider = hostProvider;
|
|
|
|
|
|
- cnxn = new ClientCnxn(
|
|
|
|
|
|
+ cnxn = createConnection(
|
|
connectStringParser.getChrootPath(),
|
|
connectStringParser.getChrootPath(),
|
|
hostProvider,
|
|
hostProvider,
|
|
sessionTimeout,
|
|
sessionTimeout,
|
|
@@ -1048,7 +1103,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
sessionId,
|
|
sessionId,
|
|
sessionPasswd,
|
|
sessionPasswd,
|
|
canBeReadOnly);
|
|
canBeReadOnly);
|
|
- cnxn.seenRwServerBefore = true; // since user has provided sessionId
|
|
|
|
|
|
+ cnxn.seenRwServerBefore = sessionId != 0; // since user has provided sessionId
|
|
cnxn.start();
|
|
cnxn.start();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1120,19 +1175,11 @@ public class ZooKeeper implements AutoCloseable {
|
|
long sessionId,
|
|
long sessionId,
|
|
byte[] sessionPasswd,
|
|
byte[] sessionPasswd,
|
|
boolean canBeReadOnly) throws IOException {
|
|
boolean canBeReadOnly) throws IOException {
|
|
- this(
|
|
|
|
- connectString,
|
|
|
|
- sessionTimeout,
|
|
|
|
- watcher,
|
|
|
|
- sessionId,
|
|
|
|
- sessionPasswd,
|
|
|
|
- canBeReadOnly,
|
|
|
|
- createDefaultHostProvider(connectString));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // default hostprovider
|
|
|
|
- private static HostProvider createDefaultHostProvider(String connectString) {
|
|
|
|
- return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
|
|
|
|
|
|
+ this(new ZooKeeperBuilder(connectString, sessionTimeout)
|
|
|
|
+ .withDefaultWatcher(watcher)
|
|
|
|
+ .withSession(sessionId, sessionPasswd)
|
|
|
|
+ .withCanBeReadOnly(canBeReadOnly)
|
|
|
|
+ .toOptions());
|
|
}
|
|
}
|
|
|
|
|
|
// VisibleForTesting
|
|
// VisibleForTesting
|