|
@@ -21,12 +21,9 @@ package org.apache.zookeeper;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.Thread.UncaughtExceptionHandler;
|
|
|
-import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.SocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
@@ -50,7 +47,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
|
import org.apache.zookeeper.ZooKeeper.States;
|
|
|
import org.apache.zookeeper.ZooKeeper.WatchRegistration;
|
|
|
-import org.apache.zookeeper.common.PathUtils;
|
|
|
+import org.apache.zookeeper.client.HostProvider;
|
|
|
import org.apache.zookeeper.proto.AuthPacket;
|
|
|
import org.apache.zookeeper.proto.ConnectRequest;
|
|
|
import org.apache.zookeeper.proto.CreateResponse;
|
|
@@ -93,9 +90,6 @@ public class ClientCnxn {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private final ArrayList<InetSocketAddress> serverAddrs =
|
|
|
- new ArrayList<InetSocketAddress>();
|
|
|
-
|
|
|
static class AuthData {
|
|
|
AuthData(String scheme, byte data[]) {
|
|
|
this.scheme = scheme;
|
|
@@ -119,8 +113,6 @@ public class ClientCnxn {
|
|
|
*/
|
|
|
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
|
|
|
|
|
|
- private int nextAddrToTry = 0;
|
|
|
-
|
|
|
private int connectTimeout;
|
|
|
|
|
|
/**
|
|
@@ -156,6 +148,11 @@ public class ClientCnxn {
|
|
|
* operation)
|
|
|
*/
|
|
|
private volatile boolean closing = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A set of ZooKeeper hosts this client could connect to.
|
|
|
+ */
|
|
|
+ private final HostProvider hostProvider;
|
|
|
|
|
|
public long getSessionId() {
|
|
|
return sessionId;
|
|
@@ -270,8 +267,9 @@ public class ClientCnxn {
|
|
|
* established until needed. The start() instance method must be called
|
|
|
* subsequent to construction.
|
|
|
*
|
|
|
- * @param hosts
|
|
|
- * a comma separated list of hosts that can be connected to.
|
|
|
+ * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
|
|
|
+ * @param hostProvider
|
|
|
+ * the list of ZooKeeper servers to connect to
|
|
|
* @param sessionTimeout
|
|
|
* the timeout for connections.
|
|
|
* @param zooKeeper
|
|
@@ -281,10 +279,10 @@ public class ClientCnxn {
|
|
|
* the socket implementation used (e.g. NIO/Netty)
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
|
|
|
+ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
|
|
|
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket)
|
|
|
throws IOException {
|
|
|
- this(hosts, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16]);
|
|
|
+ this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16]);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -292,8 +290,9 @@ public class ClientCnxn {
|
|
|
* established until needed. The start() instance method must be called
|
|
|
* subsequent to construction.
|
|
|
*
|
|
|
- * @param hosts
|
|
|
- * a comma separated list of hosts that can be connected to.
|
|
|
+ * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
|
|
|
+ * @param hostProvider
|
|
|
+ * the list of ZooKeeper servers to connect to
|
|
|
* @param sessionTimeout
|
|
|
* the timeout for connections.
|
|
|
* @param zooKeeper
|
|
@@ -305,50 +304,20 @@ public class ClientCnxn {
|
|
|
* @param sessionPasswd session passwd if re-establishing session
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
|
|
|
+ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
|
|
|
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
|
|
|
- long sessionId, byte[] sessionPasswd) throws IOException {
|
|
|
+ long sessionId, byte[] sessionPasswd) {
|
|
|
this.zooKeeper = zooKeeper;
|
|
|
this.watcher = watcher;
|
|
|
this.sessionId = sessionId;
|
|
|
this.sessionPasswd = sessionPasswd;
|
|
|
-
|
|
|
- // parse out chroot, if any
|
|
|
- int off = hosts.indexOf('/');
|
|
|
- if (off >= 0) {
|
|
|
- String chrootPath = hosts.substring(off);
|
|
|
- // ignore "/" chroot spec, same as null
|
|
|
- if (chrootPath.length() == 1) {
|
|
|
- this.chrootPath = null;
|
|
|
- } else {
|
|
|
- PathUtils.validatePath(chrootPath);
|
|
|
- this.chrootPath = chrootPath;
|
|
|
- }
|
|
|
- hosts = hosts.substring(0, off);
|
|
|
- } else {
|
|
|
- this.chrootPath = null;
|
|
|
- }
|
|
|
-
|
|
|
- String hostsList[] = hosts.split(",");
|
|
|
- for (String host : hostsList) {
|
|
|
- int port = 2181;
|
|
|
- int pidx = host.lastIndexOf(':');
|
|
|
- if (pidx >= 0) {
|
|
|
- // otherwise : is at the end of the string, ignore
|
|
|
- if (pidx < host.length() - 1) {
|
|
|
- port = Integer.parseInt(host.substring(pidx + 1));
|
|
|
- }
|
|
|
- host = host.substring(0, pidx);
|
|
|
- }
|
|
|
- InetAddress addrs[] = InetAddress.getAllByName(host);
|
|
|
- for (InetAddress addr : addrs) {
|
|
|
- serverAddrs.add(new InetSocketAddress(addr, port));
|
|
|
- }
|
|
|
- }
|
|
|
this.sessionTimeout = sessionTimeout;
|
|
|
- connectTimeout = sessionTimeout / hostsList.length;
|
|
|
+ this.hostProvider = hostProvider;
|
|
|
+ this.chrootPath = chrootPath;
|
|
|
+
|
|
|
+ connectTimeout = sessionTimeout / hostProvider.size();
|
|
|
readTimeout = sessionTimeout * 2 / 3;
|
|
|
- Collections.shuffle(serverAddrs);
|
|
|
+
|
|
|
sendThread = new SendThread(clientCnxnSocket);
|
|
|
eventThread = new EventThread();
|
|
|
}
|
|
@@ -655,9 +624,8 @@ public class ClientCnxn {
|
|
|
class SendThread extends Thread {
|
|
|
private long lastPingSentNs;
|
|
|
private final ClientCnxnSocket clientCnxnSocket;
|
|
|
- private int lastConnectIndex = -1;
|
|
|
- private int currentConnectIndex;
|
|
|
- private Random r = new Random(System.nanoTime());
|
|
|
+ private Random r = new Random(System.nanoTime());
|
|
|
+ private boolean isFirstConnect = true;
|
|
|
|
|
|
void readResponse(ByteBuffer incomingBuffer) throws IOException {
|
|
|
ByteBufferInputStream bbis = new ByteBufferInputStream(
|
|
@@ -789,7 +757,7 @@ public class ClientCnxn {
|
|
|
void primeConnection() throws IOException {
|
|
|
LOG.info("Socket connection established to "
|
|
|
+ clientCnxnSocket.getRemoteSocketAddress() + ", initiating session");
|
|
|
- lastConnectIndex = currentConnectIndex;
|
|
|
+ isFirstConnect = false;
|
|
|
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
|
|
|
sessionTimeout, sessionId, sessionPasswd);
|
|
|
synchronized (outgoingQueue) {
|
|
@@ -834,32 +802,16 @@ public class ClientCnxn {
|
|
|
}
|
|
|
|
|
|
private void startConnect() throws IOException {
|
|
|
- if (lastConnectIndex == -1) {
|
|
|
- // We don't want to delay the first try at a connect, so we
|
|
|
- // start with -1 the first time around
|
|
|
- lastConnectIndex = 0;
|
|
|
- } else {
|
|
|
+ if(!isFirstConnect){
|
|
|
try {
|
|
|
Thread.sleep(r.nextInt(1000));
|
|
|
} catch (InterruptedException e1) {
|
|
|
LOG.warn("Unexpected exception", e1);
|
|
|
}
|
|
|
- if (nextAddrToTry == lastConnectIndex) {
|
|
|
- try {
|
|
|
- // Try not to spin too fast!
|
|
|
- Thread.sleep(1000);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOG.warn("Unexpected exception", e);
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
state = States.CONNECTING;
|
|
|
- currentConnectIndex = nextAddrToTry;
|
|
|
- InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
|
|
|
- nextAddrToTry++;
|
|
|
- if (nextAddrToTry == serverAddrs.size()) {
|
|
|
- nextAddrToTry = 0;
|
|
|
- }
|
|
|
+ InetSocketAddress addr = hostProvider.next(1000);
|
|
|
+
|
|
|
LOG.info("Opening socket connection to server " + addr);
|
|
|
|
|
|
setName(getName().replaceAll("\\(.*\\)",
|
|
@@ -876,6 +828,7 @@ public class ClientCnxn {
|
|
|
clientCnxnSocket.introduce(this,sessionId);
|
|
|
clientCnxnSocket.updateNow();
|
|
|
clientCnxnSocket.updateLastSendAndHeard();
|
|
|
+ int to;
|
|
|
while (state.isAlive()) {
|
|
|
try {
|
|
|
if (!clientCnxnSocket.isConnected()) {
|
|
@@ -886,11 +839,13 @@ public class ClientCnxn {
|
|
|
startConnect();
|
|
|
clientCnxnSocket.updateLastSendAndHeard();
|
|
|
}
|
|
|
-
|
|
|
- int to = readTimeout - clientCnxnSocket.getIdleRecv();
|
|
|
- if (state != States.CONNECTED) {
|
|
|
+
|
|
|
+ if (state == States.CONNECTED) {
|
|
|
+ to = readTimeout - clientCnxnSocket.getIdleRecv();
|
|
|
+ } else {
|
|
|
to = connectTimeout - clientCnxnSocket.getIdleRecv();
|
|
|
}
|
|
|
+
|
|
|
if (to <= 0) {
|
|
|
throw new SessionTimeoutException(
|
|
|
"Client session timed out, have not heard from server in "
|
|
@@ -1002,7 +957,8 @@ public class ClientCnxn {
|
|
|
+ Long.toHexString(sessionId) + " has expired");
|
|
|
}
|
|
|
readTimeout = negotiatedSessionTimeout * 2 / 3;
|
|
|
- connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
|
|
|
+ connectTimeout = negotiatedSessionTimeout / hostProvider.size();
|
|
|
+ hostProvider.onConnected();
|
|
|
sessionId = _sessionId;
|
|
|
sessionPasswd = _sessionPasswd;
|
|
|
state = States.CONNECTED;
|