|
@@ -73,8 +73,7 @@ import org.apache.zookeeper.server.ZooTrace;
|
|
|
public class ClientCnxn {
|
|
|
private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
|
|
|
|
|
|
- private ArrayList<InetSocketAddress> serverAddrs =
|
|
|
- new ArrayList<InetSocketAddress>();
|
|
|
+ private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>();
|
|
|
|
|
|
static class AuthData {
|
|
|
AuthData(String scheme, byte data[]) {
|
|
@@ -121,10 +120,12 @@ public class ClientCnxn {
|
|
|
|
|
|
final Selector selector = Selector.open();
|
|
|
|
|
|
- /** Set to true when close is called. Latches the connection such that
|
|
|
- * we don't attempt to re-connect to the server if in the middle of
|
|
|
- * closing the connection (client sends session disconnect to server
|
|
|
- * as part of close operation) */
|
|
|
+ /**
|
|
|
+ * Set to true when close is called. Latches the connection such that we
|
|
|
+ * don't attempt to re-connect to the server if in the middle of closing the
|
|
|
+ * connection (client sends session disconnect to server as part of close
|
|
|
+ * operation)
|
|
|
+ */
|
|
|
volatile boolean closing = false;
|
|
|
|
|
|
public long getSessionId() {
|
|
@@ -138,7 +139,8 @@ public class ClientCnxn {
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
StringBuffer sb = new StringBuffer();
|
|
|
- sb.append("sessionId: 0x").append(Long.toHexString(getSessionId())).append("\n");
|
|
|
+ sb.append("sessionId: 0x").append(Long.toHexString(getSessionId()))
|
|
|
+ .append("\n");
|
|
|
sb.append("lastZxid: ").append(lastZxid).append("\n");
|
|
|
sb.append("xid: ").append(xid).append("\n");
|
|
|
sb.append("nextAddrToTry: ").append(nextAddrToTry).append("\n");
|
|
@@ -200,8 +202,25 @@ public class ClientCnxn {
|
|
|
}
|
|
|
this.watchRegistration = watchRegistration;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+
|
|
|
+ sb.append("path:" + path);
|
|
|
+ sb.append(" finished:" + finished);
|
|
|
+
|
|
|
+ sb.append(" header:: " + header);
|
|
|
+ sb.append(" replyHeader:: " + replyHeader);
|
|
|
+ sb.append(" request:: " + request);
|
|
|
+ sb.append(" response:: " + response);
|
|
|
+
|
|
|
+ // jute toString is horrible, remove unnecessary newlines
|
|
|
+ return sb.toString().replaceAll("\r*\n+", " ");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
|
|
|
ClientWatchManager watcher)
|
|
|
throws IOException
|
|
@@ -224,8 +243,7 @@ public class ClientCnxn {
|
|
|
*/
|
|
|
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
|
|
|
ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
|
|
|
- throws IOException
|
|
|
- {
|
|
|
+ throws IOException {
|
|
|
this.zooKeeper = zooKeeper;
|
|
|
this.watcher = watcher;
|
|
|
this.sessionId = sessionId;
|
|
@@ -477,20 +495,28 @@ public class ClientCnxn {
|
|
|
ByteBufferInputStream bbis = new ByteBufferInputStream(
|
|
|
incomingBuffer);
|
|
|
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
|
|
|
- ReplyHeader r = new ReplyHeader();
|
|
|
+ ReplyHeader replyHdr = new ReplyHeader();
|
|
|
|
|
|
- r.deserialize(bbia, "header");
|
|
|
- if (r.getXid() == -2) {
|
|
|
+ replyHdr.deserialize(bbia, "header");
|
|
|
+ if (replyHdr.getXid() == -2) {
|
|
|
// -2 is the xid for pings
|
|
|
+ LOG
|
|
|
+ .debug("Got ping sessionid:0x"
|
|
|
+ + Long.toHexString(sessionId));
|
|
|
return;
|
|
|
}
|
|
|
- if (r.getXid() == -4) {
|
|
|
+ if (replyHdr.getXid() == -4) {
|
|
|
// -2 is the xid for AuthPacket
|
|
|
// TODO: process AuthPacket here
|
|
|
+ LOG
|
|
|
+ .debug("Got auth sessionid:0x"
|
|
|
+ + Long.toHexString(sessionId));
|
|
|
return;
|
|
|
}
|
|
|
- if (r.getXid() == -1) {
|
|
|
+ if (replyHdr.getXid() == -1) {
|
|
|
// -1 means notification
|
|
|
+ LOG.debug("Got notification sessionid:0x"
|
|
|
+ + Long.toHexString(sessionId));
|
|
|
WatcherEvent event = new WatcherEvent();
|
|
|
event.deserialize(bbia, "response");
|
|
|
WatchedEvent we = new WatchedEvent(event);
|
|
@@ -504,29 +530,37 @@ public class ClientCnxn {
|
|
|
}
|
|
|
if (pendingQueue.size() == 0) {
|
|
|
throw new IOException("Nothing in the queue, but got "
|
|
|
- + r.getXid());
|
|
|
+ + replyHdr.getXid());
|
|
|
}
|
|
|
- Packet p = null;
|
|
|
+ Packet packet = null;
|
|
|
synchronized (pendingQueue) {
|
|
|
- p = pendingQueue.remove();
|
|
|
+ packet = pendingQueue.remove();
|
|
|
}
|
|
|
/*
|
|
|
* Since requests are processed in order, we better get a response
|
|
|
* to the first request!
|
|
|
*/
|
|
|
- if (p.header.getXid() != r.getXid()) {
|
|
|
- throw new IOException("Xid out of order. Got " + r.getXid()
|
|
|
- + " expected " + p.header.getXid());
|
|
|
+ if (packet.header.getXid() != replyHdr.getXid()) {
|
|
|
+ throw new IOException("Xid out of order. Got "
|
|
|
+ + replyHdr.getXid() + " expected "
|
|
|
+ + packet.header.getXid());
|
|
|
}
|
|
|
- p.replyHeader.setXid(r.getXid());
|
|
|
- p.replyHeader.setErr(r.getErr());
|
|
|
- p.replyHeader.setZxid(r.getZxid());
|
|
|
- lastZxid = r.getZxid();
|
|
|
- if (p.response != null && r.getErr() == 0) {
|
|
|
- p.response.deserialize(bbia, "response");
|
|
|
+
|
|
|
+ packet.replyHeader.setXid(replyHdr.getXid());
|
|
|
+ packet.replyHeader.setErr(replyHdr.getErr());
|
|
|
+ packet.replyHeader.setZxid(replyHdr.getZxid());
|
|
|
+ lastZxid = replyHdr.getZxid();
|
|
|
+ if (packet.response != null && replyHdr.getErr() == 0) {
|
|
|
+ packet.response.deserialize(bbia, "response");
|
|
|
}
|
|
|
- p.finished = true;
|
|
|
- finishPacket(p);
|
|
|
+ packet.finished = true;
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Reading reply sessionid:0x"
|
|
|
+ + Long.toHexString(sessionId) + ", packet:: " + packet);
|
|
|
+ }
|
|
|
+
|
|
|
+ finishPacket(packet);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -789,14 +823,15 @@ public class ClientCnxn {
|
|
|
} catch (Exception e) {
|
|
|
if (closing) {
|
|
|
// closing so this is expected
|
|
|
- LOG.info("Exception while closing send thread for session 0x"
|
|
|
+ LOG
|
|
|
+ .info("Exception while closing send thread for session 0x"
|
|
|
+ Long.toHexString(getSessionId())
|
|
|
+ " : " + e.getMessage());
|
|
|
break;
|
|
|
} else {
|
|
|
LOG.warn("Exception closing session 0x"
|
|
|
- + Long.toHexString(getSessionId()),
|
|
|
- e);
|
|
|
+ + Long.toHexString(getSessionId()) + " to "
|
|
|
+ + sockKey, e);
|
|
|
cleanup();
|
|
|
if (zooKeeper.state.isAlive()) {
|
|
|
eventThread.queueEvent(new WatchedEvent(
|
|
@@ -889,8 +924,8 @@ public class ClientCnxn {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Close the connection, which includes; send session disconnect to
|
|
|
- * the server, shutdown the send/event threads.
|
|
|
+ * Close the connection, which includes; send session disconnect to the
|
|
|
+ * server, shutdown the send/event threads.
|
|
|
*
|
|
|
* @throws IOException
|
|
|
*/
|
|
@@ -919,13 +954,10 @@ public class ClientCnxn {
|
|
|
}
|
|
|
|
|
|
public ReplyHeader submitRequest(RequestHeader h, Record request,
|
|
|
- Record response,
|
|
|
- WatchRegistration watchRegistration)
|
|
|
- throws InterruptedException
|
|
|
- {
|
|
|
+ Record response, WatchRegistration watchRegistration)
|
|
|
+ throws InterruptedException {
|
|
|
ReplyHeader r = new ReplyHeader();
|
|
|
- Packet packet =
|
|
|
- queuePacket(h, r, request, response, null, null, null,
|
|
|
+ Packet packet = queuePacket(h, r, request, response, null, null, null,
|
|
|
watchRegistration);
|
|
|
synchronized (packet) {
|
|
|
while (!packet.finished) {
|
|
@@ -937,8 +969,7 @@ public class ClientCnxn {
|
|
|
|
|
|
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
|
|
|
Record response, AsyncCallback cb, String path, Object ctx,
|
|
|
- WatchRegistration watchRegistration)
|
|
|
- {
|
|
|
+ WatchRegistration watchRegistration) {
|
|
|
Packet packet = null;
|
|
|
synchronized (outgoingQueue) {
|
|
|
if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
|