|
@@ -76,18 +76,18 @@ import org.apache.zookeeper.server.ZooTrace;
|
|
*/
|
|
*/
|
|
public class ClientCnxn {
|
|
public class ClientCnxn {
|
|
private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
|
|
private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
|
|
-
|
|
|
|
|
|
+
|
|
/** This controls whether automatic watch resetting is enabled.
|
|
/** This controls whether automatic watch resetting is enabled.
|
|
* Clients automatically reset watches during session reconnect, this
|
|
* Clients automatically reset watches during session reconnect, this
|
|
* option allows the client to turn off this behavior by setting
|
|
* option allows the client to turn off this behavior by setting
|
|
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
|
|
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
|
|
private static boolean disableAutoWatchReset;
|
|
private static boolean disableAutoWatchReset;
|
|
-
|
|
|
|
|
|
+
|
|
public static final int packetLen;
|
|
public static final int packetLen;
|
|
static {
|
|
static {
|
|
- // this var should not be public, but otw there is no easy way
|
|
|
|
|
|
+ // this var should not be public, but otw there is no easy way
|
|
// to test
|
|
// to test
|
|
- disableAutoWatchReset =
|
|
|
|
|
|
+ disableAutoWatchReset =
|
|
Boolean.getBoolean("zookeeper.disableAutoWatchReset");
|
|
Boolean.getBoolean("zookeeper.disableAutoWatchReset");
|
|
LOG.info("zookeeper.disableAutoWatchReset is " + disableAutoWatchReset);
|
|
LOG.info("zookeeper.disableAutoWatchReset is " + disableAutoWatchReset);
|
|
packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
|
|
packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
|
|
@@ -105,7 +105,7 @@ public class ClientCnxn {
|
|
|
|
|
|
byte data[];
|
|
byte data[];
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private ArrayList<AuthData> authInfo = new ArrayList<AuthData>();
|
|
private ArrayList<AuthData> authInfo = new ArrayList<AuthData>();
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -141,7 +141,7 @@ public class ClientCnxn {
|
|
final EventThread eventThread;
|
|
final EventThread eventThread;
|
|
|
|
|
|
final Selector selector = Selector.open();
|
|
final Selector selector = Selector.open();
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Set to true when close is called. Latches the connection such that we
|
|
* Set to true when close is called. Latches the connection such that we
|
|
* don't attempt to re-connect to the server if in the middle of closing the
|
|
* don't attempt to re-connect to the server if in the middle of closing the
|
|
@@ -153,7 +153,7 @@ public class ClientCnxn {
|
|
public long getSessionId() {
|
|
public long getSessionId() {
|
|
return sessionId;
|
|
return sessionId;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
public byte[] getSessionPasswd() {
|
|
public byte[] getSessionPasswd() {
|
|
return sessionPasswd;
|
|
return sessionPasswd;
|
|
}
|
|
}
|
|
@@ -329,7 +329,7 @@ public class ClientCnxn {
|
|
sendThread = new SendThread();
|
|
sendThread = new SendThread();
|
|
eventThread = new EventThread();
|
|
eventThread = new EventThread();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* tests use this to check on reset of watches
|
|
* tests use this to check on reset of watches
|
|
* @return if the auto reset of watches are disabled
|
|
* @return if the auto reset of watches are disabled
|
|
@@ -360,15 +360,15 @@ public class ClientCnxn {
|
|
private static class WatcherSetEventPair {
|
|
private static class WatcherSetEventPair {
|
|
private final Set<Watcher> watchers;
|
|
private final Set<Watcher> watchers;
|
|
private final WatchedEvent event;
|
|
private final WatchedEvent event;
|
|
-
|
|
|
|
|
|
+
|
|
public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
|
|
public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
|
|
this.watchers = watchers;
|
|
this.watchers = watchers;
|
|
this.event = event;
|
|
this.event = event;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
class EventThread extends Thread {
|
|
class EventThread extends Thread {
|
|
- private final LinkedBlockingQueue<Object> waitingEvents =
|
|
|
|
|
|
+ private final LinkedBlockingQueue<Object> waitingEvents =
|
|
new LinkedBlockingQueue<Object>();
|
|
new LinkedBlockingQueue<Object>();
|
|
|
|
|
|
/** This is really the queued session state until the event
|
|
/** This is really the queued session state until the event
|
|
@@ -382,9 +382,9 @@ public class ClientCnxn {
|
|
setUncaughtExceptionHandler(uncaughtExceptionHandler);
|
|
setUncaughtExceptionHandler(uncaughtExceptionHandler);
|
|
setDaemon(true);
|
|
setDaemon(true);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
public void queueEvent(WatchedEvent event) {
|
|
public void queueEvent(WatchedEvent event) {
|
|
- if (event.getType() == EventType.None
|
|
|
|
|
|
+ if (event.getType() == EventType.None
|
|
&& sessionState == event.getState()) {
|
|
&& sessionState == event.getState()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
@@ -398,7 +398,7 @@ public class ClientCnxn {
|
|
// queue the pair (watch set & event) for later processing
|
|
// queue the pair (watch set & event) for later processing
|
|
waitingEvents.add(pair);
|
|
waitingEvents.add(pair);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
public void queuePacket(Packet packet) {
|
|
public void queuePacket(Packet packet) {
|
|
waitingEvents.add(packet);
|
|
waitingEvents.add(packet);
|
|
}
|
|
}
|
|
@@ -510,7 +510,7 @@ public class ClientCnxn {
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
LOG.error("Event thread exiting due to interruption", e);
|
|
LOG.error("Event thread exiting due to interruption", e);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
LOG.info("EventThread shut down");
|
|
LOG.info("EventThread shut down");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -609,7 +609,7 @@ public class ClientCnxn {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Got ping response for sessionid:0x"
|
|
LOG.debug("Got ping response for sessionid:0x"
|
|
+ Long.toHexString(sessionId)
|
|
+ Long.toHexString(sessionId)
|
|
- + " after "
|
|
|
|
|
|
+ + " after "
|
|
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
|
|
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
|
|
+ "ms");
|
|
+ "ms");
|
|
}
|
|
}
|
|
@@ -640,7 +640,7 @@ public class ClientCnxn {
|
|
LOG.debug("Got " + we + " for sessionid 0x"
|
|
LOG.debug("Got " + we + " for sessionid 0x"
|
|
+ Long.toHexString(sessionId));
|
|
+ Long.toHexString(sessionId));
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
eventThread.queueEvent( we );
|
|
eventThread.queueEvent( we );
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
@@ -775,7 +775,7 @@ public class ClientCnxn {
|
|
}
|
|
}
|
|
|
|
|
|
SendThread() {
|
|
SendThread() {
|
|
- super(currentThread().getName() + "-SendThread");
|
|
|
|
|
|
+ super(currentThread().getName() + "-SendThread()");
|
|
zooKeeper.state = States.CONNECTING;
|
|
zooKeeper.state = States.CONNECTING;
|
|
setUncaughtExceptionHandler(uncaughtExceptionHandler);
|
|
setUncaughtExceptionHandler(uncaughtExceptionHandler);
|
|
setDaemon(true);
|
|
setDaemon(true);
|
|
@@ -867,6 +867,8 @@ public class ClientCnxn {
|
|
sock.socket().setSoLinger(false, -1);
|
|
sock.socket().setSoLinger(false, -1);
|
|
sock.socket().setTcpNoDelay(true);
|
|
sock.socket().setTcpNoDelay(true);
|
|
LOG.info("Attempting connection to server " + addr);
|
|
LOG.info("Attempting connection to server " + addr);
|
|
|
|
+ setName(getName().replaceAll("\\(.*\\)",
|
|
|
|
+ "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
|
|
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
|
|
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
|
|
if (sock.connect(addr)) {
|
|
if (sock.connect(addr)) {
|
|
primeConnection(sockKey);
|
|
primeConnection(sockKey);
|
|
@@ -964,7 +966,7 @@ public class ClientCnxn {
|
|
+ " : " + e.getMessage());
|
|
+ " : " + e.getMessage());
|
|
break;
|
|
break;
|
|
} else {
|
|
} else {
|
|
- LOG.warn("Exception closing session 0x"
|
|
|
|
|
|
+ LOG.warn("Exception closing session 0x"
|
|
+ Long.toHexString(getSessionId()) + " to "
|
|
+ Long.toHexString(getSessionId()) + " to "
|
|
+ sockKey, e);
|
|
+ sockKey, e);
|
|
cleanup();
|
|
cleanup();
|
|
@@ -974,7 +976,7 @@ public class ClientCnxn {
|
|
Event.KeeperState.Disconnected,
|
|
Event.KeeperState.Disconnected,
|
|
null));
|
|
null));
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
now = System.currentTimeMillis();
|
|
now = System.currentTimeMillis();
|
|
lastHeard = now;
|
|
lastHeard = now;
|
|
lastSend = now;
|
|
lastSend = now;
|
|
@@ -1051,7 +1053,7 @@ public class ClientCnxn {
|
|
* behavior.
|
|
* behavior.
|
|
*/
|
|
*/
|
|
public void disconnect() {
|
|
public void disconnect() {
|
|
- LOG.info("Disconnecting ClientCnxn for session: 0x"
|
|
|
|
|
|
+ LOG.info("Disconnecting ClientCnxn for session: 0x"
|
|
+ Long.toHexString(getSessionId()));
|
|
+ Long.toHexString(getSessionId()));
|
|
|
|
|
|
sendThread.close();
|
|
sendThread.close();
|
|
@@ -1061,19 +1063,19 @@ public class ClientCnxn {
|
|
/**
|
|
/**
|
|
* Close the connection, which includes; send session disconnect to the
|
|
* Close the connection, which includes; send session disconnect to the
|
|
* server, shutdown the send/event threads.
|
|
* server, shutdown the send/event threads.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public void close() throws IOException {
|
|
public void close() throws IOException {
|
|
- LOG.info("Closing ClientCnxn for session: 0x"
|
|
|
|
|
|
+ LOG.info("Closing ClientCnxn for session: 0x"
|
|
+ Long.toHexString(getSessionId()));
|
|
+ Long.toHexString(getSessionId()));
|
|
|
|
|
|
closing = true;
|
|
closing = true;
|
|
-
|
|
|
|
|
|
+
|
|
try {
|
|
try {
|
|
RequestHeader h = new RequestHeader();
|
|
RequestHeader h = new RequestHeader();
|
|
h.setType(ZooDefs.OpCode.closeSession);
|
|
h.setType(ZooDefs.OpCode.closeSession);
|
|
-
|
|
|
|
|
|
+
|
|
submitRequest(h, null, null, null);
|
|
submitRequest(h, null, null, null);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
// ignore, close the send/event threads
|
|
// ignore, close the send/event threads
|