|
@@ -336,87 +336,96 @@ public class ClientCnxn {
|
|
|
try {
|
|
|
while (true) {
|
|
|
Object event = waitingEvents.take();
|
|
|
- if (event == eventOfDeath) {
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- if (event instanceof WatcherSetEventPair) {
|
|
|
- // each watcher will process the event
|
|
|
- WatcherSetEventPair pair = (WatcherSetEventPair)event;
|
|
|
- for (Watcher watcher: pair.watchers) {
|
|
|
- watcher.process(pair.event);
|
|
|
- }
|
|
|
- } else {
|
|
|
- Packet p = (Packet) event;
|
|
|
- int rc = 0;
|
|
|
- String path = p.path;
|
|
|
- if (p.replyHeader.getErr() != 0) {
|
|
|
- rc = p.replyHeader.getErr();
|
|
|
+ try {
|
|
|
+ if (event == eventOfDeath) {
|
|
|
+ break;
|
|
|
}
|
|
|
- if (p.cb == null) {
|
|
|
- LOG.warn("Somehow a null cb got to EventThread!");
|
|
|
- } else if (p.response instanceof ExistsResponse
|
|
|
- || p.response instanceof SetDataResponse
|
|
|
- || p.response instanceof SetACLResponse) {
|
|
|
- StatCallback cb = (StatCallback) p.cb;
|
|
|
- if (rc == 0) {
|
|
|
- if (p.response instanceof ExistsResponse) {
|
|
|
- cb.processResult(rc, path, p.ctx,
|
|
|
- ((ExistsResponse) p.response)
|
|
|
- .getStat());
|
|
|
- } else if (p.response instanceof SetDataResponse) {
|
|
|
- cb.processResult(rc, path, p.ctx,
|
|
|
- ((SetDataResponse) p.response)
|
|
|
- .getStat());
|
|
|
- } else if (p.response instanceof SetACLResponse) {
|
|
|
- cb.processResult(rc, path, p.ctx,
|
|
|
- ((SetACLResponse) p.response)
|
|
|
- .getStat());
|
|
|
+
|
|
|
+ if (event instanceof WatcherSetEventPair) {
|
|
|
+ // each watcher will process the event
|
|
|
+ WatcherSetEventPair pair = (WatcherSetEventPair) event;
|
|
|
+ for (Watcher watcher : pair.watchers) {
|
|
|
+ try {
|
|
|
+ watcher.process(pair.event);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error while calling watcher ", t);
|
|
|
}
|
|
|
- } else {
|
|
|
- cb.processResult(rc, path, p.ctx, null);
|
|
|
- }
|
|
|
- } else if (p.response instanceof GetDataResponse) {
|
|
|
- DataCallback cb = (DataCallback) p.cb;
|
|
|
- GetDataResponse rsp = (GetDataResponse) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb.processResult(rc, path, p.ctx,
|
|
|
- rsp.getData(), rsp.getStat());
|
|
|
- } else {
|
|
|
- cb.processResult(rc, path, p.ctx, null, null);
|
|
|
- }
|
|
|
- } else if (p.response instanceof GetACLResponse) {
|
|
|
- ACLCallback cb = (ACLCallback) p.cb;
|
|
|
- GetACLResponse rsp = (GetACLResponse) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb.processResult(rc, path, p.ctx, rsp.getAcl(),
|
|
|
- rsp.getStat());
|
|
|
- } else {
|
|
|
- cb.processResult(rc, path, p.ctx, null, null);
|
|
|
}
|
|
|
- } else if (p.response instanceof GetChildrenResponse) {
|
|
|
- ChildrenCallback cb = (ChildrenCallback) p.cb;
|
|
|
- GetChildrenResponse rsp = (GetChildrenResponse) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb.processResult(rc, path, p.ctx, rsp
|
|
|
- .getChildren());
|
|
|
- } else {
|
|
|
- cb.processResult(rc, path, p.ctx, null);
|
|
|
+ } else {
|
|
|
+ Packet p = (Packet) event;
|
|
|
+ int rc = 0;
|
|
|
+ String path = p.path;
|
|
|
+ if (p.replyHeader.getErr() != 0) {
|
|
|
+ rc = p.replyHeader.getErr();
|
|
|
}
|
|
|
- } else if (p.response instanceof CreateResponse) {
|
|
|
- StringCallback cb = (StringCallback) p.cb;
|
|
|
- CreateResponse rsp = (CreateResponse) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb
|
|
|
- .processResult(rc, path, p.ctx, rsp
|
|
|
- .getPath());
|
|
|
- } else {
|
|
|
- cb.processResult(rc, path, p.ctx, null);
|
|
|
+ if (p.cb == null) {
|
|
|
+ LOG.warn("Somehow a null cb got to EventThread!");
|
|
|
+ } else if (p.response instanceof ExistsResponse
|
|
|
+ || p.response instanceof SetDataResponse
|
|
|
+ || p.response instanceof SetACLResponse) {
|
|
|
+ StatCallback cb = (StatCallback) p.cb;
|
|
|
+ if (rc == 0) {
|
|
|
+ if (p.response instanceof ExistsResponse) {
|
|
|
+ cb.processResult(rc, path, p.ctx,
|
|
|
+ ((ExistsResponse) p.response)
|
|
|
+ .getStat());
|
|
|
+ } else if (p.response instanceof SetDataResponse) {
|
|
|
+ cb.processResult(rc, path, p.ctx,
|
|
|
+ ((SetDataResponse) p.response)
|
|
|
+ .getStat());
|
|
|
+ } else if (p.response instanceof SetACLResponse) {
|
|
|
+ cb.processResult(rc, path, p.ctx,
|
|
|
+ ((SetACLResponse) p.response)
|
|
|
+ .getStat());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, path, p.ctx, null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof GetDataResponse) {
|
|
|
+ DataCallback cb = (DataCallback) p.cb;
|
|
|
+ GetDataResponse rsp = (GetDataResponse) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, path, p.ctx, rsp
|
|
|
+ .getData(), rsp.getStat());
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, path, p.ctx, null,
|
|
|
+ null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof GetACLResponse) {
|
|
|
+ ACLCallback cb = (ACLCallback) p.cb;
|
|
|
+ GetACLResponse rsp = (GetACLResponse) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, path, p.ctx, rsp
|
|
|
+ .getAcl(), rsp.getStat());
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, path, p.ctx, null,
|
|
|
+ null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof GetChildrenResponse) {
|
|
|
+ ChildrenCallback cb = (ChildrenCallback) p.cb;
|
|
|
+ GetChildrenResponse rsp = (GetChildrenResponse) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, path, p.ctx, rsp
|
|
|
+ .getChildren());
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, path, p.ctx, null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof CreateResponse) {
|
|
|
+ StringCallback cb = (StringCallback) p.cb;
|
|
|
+ CreateResponse rsp = (CreateResponse) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, path, p.ctx, rsp
|
|
|
+ .getPath());
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, path, p.ctx, null);
|
|
|
+ }
|
|
|
+ } else if (p.cb instanceof VoidCallback) {
|
|
|
+ VoidCallback cb = (VoidCallback) p.cb;
|
|
|
+ cb.processResult(rc, path, p.ctx);
|
|
|
}
|
|
|
- } else if (p.cb instanceof VoidCallback) {
|
|
|
- VoidCallback cb = (VoidCallback) p.cb;
|
|
|
- cb.processResult(rc, path, p.ctx);
|
|
|
}
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Caught unexpected throwable", t);
|
|
|
}
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
@@ -504,15 +513,6 @@ public class ClientCnxn {
|
|
|
sessionPasswd = conRsp.getPasswd();
|
|
|
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
|
|
|
Watcher.Event.KeeperState.SyncConnected, null));
|
|
|
- if (!disableAutoWatchReset) {
|
|
|
- SetWatches sw = new SetWatches(lastZxid,
|
|
|
- zooKeeper.getDataWatches(),
|
|
|
- zooKeeper.getExistWatches(),
|
|
|
- zooKeeper.getChildWatches());
|
|
|
- RequestHeader h = new RequestHeader();
|
|
|
- h.setType(ZooDefs.OpCode.setWatches);
|
|
|
- queuePacket(h, new ReplyHeader(), sw, null, null, null, null, null);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
void readResponse() throws IOException {
|
|
@@ -702,6 +702,20 @@ public class ClientCnxn {
|
|
|
bb.putInt(bb.capacity() - 4);
|
|
|
bb.rewind();
|
|
|
synchronized (outgoingQueue) {
|
|
|
+ // We add backwards since we are pushing into the front
|
|
|
+ if (!disableAutoWatchReset) {
|
|
|
+ SetWatches sw = new SetWatches(lastZxid,
|
|
|
+ zooKeeper.getDataWatches(),
|
|
|
+ zooKeeper.getExistWatches(),
|
|
|
+ zooKeeper.getChildWatches());
|
|
|
+ RequestHeader h = new RequestHeader();
|
|
|
+ h.setType(ZooDefs.OpCode.setWatches);
|
|
|
+ h.setXid(-8);
|
|
|
+ Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
|
|
|
+ null);
|
|
|
+ outgoingQueue.addFirst(packet);
|
|
|
+ }
|
|
|
+
|
|
|
for (AuthData id : authInfo) {
|
|
|
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
|
|
|
OpCode.auth), null, new AuthPacket(0, id.scheme,
|