|
@@ -194,7 +194,13 @@ public class ClientCnxn {
|
|
|
* This class allows us to pass the headers and the relevant records around.
|
|
|
*/
|
|
|
static class Packet {
|
|
|
- RequestHeader header;
|
|
|
+ RequestHeader requestHeader;
|
|
|
+
|
|
|
+ ReplyHeader replyHeader;
|
|
|
+
|
|
|
+ Record request;
|
|
|
+
|
|
|
+ Record response;
|
|
|
|
|
|
ByteBuffer bb;
|
|
|
|
|
@@ -203,12 +209,6 @@ public class ClientCnxn {
|
|
|
/** Servers's view of the path (may differ due to chroot) **/
|
|
|
String serverPath;
|
|
|
|
|
|
- ReplyHeader replyHeader;
|
|
|
-
|
|
|
- Record request;
|
|
|
-
|
|
|
- Record response;
|
|
|
-
|
|
|
boolean finished;
|
|
|
|
|
|
AsyncCallback cb;
|
|
@@ -217,33 +217,33 @@ public class ClientCnxn {
|
|
|
|
|
|
WatchRegistration watchRegistration;
|
|
|
|
|
|
- Packet(RequestHeader header, ReplyHeader replyHeader, Record record,
|
|
|
- Record response, ByteBuffer bb,
|
|
|
- WatchRegistration watchRegistration) {
|
|
|
- this.header = header;
|
|
|
+ Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request,
|
|
|
+ Record response, WatchRegistration watchRegistration) {
|
|
|
+ this.requestHeader = requestHeader;
|
|
|
this.replyHeader = replyHeader;
|
|
|
- this.request = record;
|
|
|
+ this.request = request;
|
|
|
this.response = response;
|
|
|
- if (bb != null) {
|
|
|
- this.bb = bb;
|
|
|
- } else {
|
|
|
- try {
|
|
|
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
- BinaryOutputArchive boa = BinaryOutputArchive
|
|
|
- .getArchive(baos);
|
|
|
- boa.writeInt(-1, "len"); // We'll fill this in later
|
|
|
- header.serialize(boa, "header");
|
|
|
- if (record != null) {
|
|
|
- record.serialize(boa, "request");
|
|
|
- }
|
|
|
- baos.close();
|
|
|
- this.bb = ByteBuffer.wrap(baos.toByteArray());
|
|
|
- this.bb.putInt(this.bb.capacity() - 4);
|
|
|
- this.bb.rewind();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Ignoring unexpected exception", e);
|
|
|
+
|
|
|
+ try {
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
|
|
|
+ boa.writeInt(-1, "len"); // We'll fill this in later
|
|
|
+ if (requestHeader != null) {
|
|
|
+ requestHeader.serialize(boa, "header");
|
|
|
+ }
|
|
|
+ if (request instanceof ConnectRequest) {
|
|
|
+ request.serialize(boa, "connect");
|
|
|
+ } else if (request != null) {
|
|
|
+ request.serialize(boa, "request");
|
|
|
}
|
|
|
+ baos.close();
|
|
|
+ this.bb = ByteBuffer.wrap(baos.toByteArray());
|
|
|
+ this.bb.putInt(this.bb.capacity() - 4);
|
|
|
+ this.bb.rewind();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Ignoring unexpected exception", e);
|
|
|
}
|
|
|
+
|
|
|
this.watchRegistration = watchRegistration;
|
|
|
}
|
|
|
|
|
@@ -255,7 +255,7 @@ public class ClientCnxn {
|
|
|
sb.append(" serverPath:" + serverPath);
|
|
|
sb.append(" finished:" + finished);
|
|
|
|
|
|
- sb.append(" header:: " + header);
|
|
|
+ sb.append(" header:: " + requestHeader);
|
|
|
sb.append(" replyHeader:: " + replyHeader);
|
|
|
sb.append(" request:: " + request);
|
|
|
sb.append(" response:: " + response);
|
|
@@ -265,7 +265,6 @@ public class ClientCnxn {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* Creates a connection object. The actual network connect doesn't get
|
|
|
* established until needed. The start() instance method must be called
|
|
@@ -731,14 +730,14 @@ public class ClientCnxn {
|
|
|
* to the first request!
|
|
|
*/
|
|
|
try {
|
|
|
- if (packet.header.getXid() != replyHdr.getXid()) {
|
|
|
+ if (packet.requestHeader.getXid() != replyHdr.getXid()) {
|
|
|
packet.replyHeader.setErr(
|
|
|
KeeperException.Code.CONNECTIONLOSS.intValue());
|
|
|
throw new IOException("Xid out of order. Got Xid "
|
|
|
+ replyHdr.getXid() + " with err " +
|
|
|
+ replyHdr.getErr() +
|
|
|
" expected Xid "
|
|
|
- + packet.header.getXid()
|
|
|
+ + packet.requestHeader.getXid()
|
|
|
+ " for a packet with details: "
|
|
|
+ packet );
|
|
|
}
|
|
@@ -793,14 +792,6 @@ public class ClientCnxn {
|
|
|
lastConnectIndex = currentConnectIndex;
|
|
|
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
|
|
|
sessionTimeout, sessionId, sessionPasswd);
|
|
|
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
|
|
|
- boa.writeInt(-1, "len");
|
|
|
- conReq.serialize(boa, "connect");
|
|
|
- baos.close();
|
|
|
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
|
|
|
- bb.putInt(bb.capacity() - 4);
|
|
|
- bb.rewind();
|
|
|
synchronized (outgoingQueue) {
|
|
|
// We add backwards since we are pushing into the front
|
|
|
// Only send if there's a pending watch
|
|
@@ -817,17 +808,16 @@ public class ClientCnxn {
|
|
|
RequestHeader h = new RequestHeader();
|
|
|
h.setType(ZooDefs.OpCode.setWatches);
|
|
|
h.setXid(-8);
|
|
|
- Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
|
|
|
- null);
|
|
|
+ Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
|
|
|
outgoingQueue.addFirst(packet);
|
|
|
}
|
|
|
|
|
|
for (AuthData id : authInfo) {
|
|
|
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
|
|
|
OpCode.auth), null, new AuthPacket(0, id.scheme,
|
|
|
- id.data), null, null, null));
|
|
|
+ id.data), null, null));
|
|
|
}
|
|
|
- outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
|
|
|
+ outgoingQueue.addFirst((new Packet(null, null, conReq, null,
|
|
|
null)));
|
|
|
}
|
|
|
clientCnxnSocket.enableReadWriteOnly();
|
|
@@ -1106,8 +1096,7 @@ public class ClientCnxn {
|
|
|
if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
|
|
|
h.setXid(getXid());
|
|
|
}
|
|
|
- packet = new Packet(h, r, request, response, null,
|
|
|
- watchRegistration);
|
|
|
+ packet = new Packet(h, r, request, response, watchRegistration);
|
|
|
packet.cb = cb;
|
|
|
packet.ctx = ctx;
|
|
|
packet.clientPath = clientPath;
|