|
@@ -451,6 +451,9 @@ public class ClientCnxn {
|
|
|
*/
|
|
|
private volatile KeeperState sessionState = KeeperState.Disconnected;
|
|
|
|
|
|
+ private volatile boolean wasKilled = false;
|
|
|
+ private volatile boolean isRunning = false;
|
|
|
+
|
|
|
EventThread() {
|
|
|
super(makeThreadName("-EventThread"));
|
|
|
setUncaughtExceptionHandler(uncaughtExceptionHandler);
|
|
@@ -473,9 +476,16 @@ public class ClientCnxn {
|
|
|
waitingEvents.add(pair);
|
|
|
}
|
|
|
|
|
|
- public void queuePacket(Packet packet) {
|
|
|
- waitingEvents.add(packet);
|
|
|
- }
|
|
|
+ public void queuePacket(Packet packet) {
|
|
|
+ if (wasKilled) {
|
|
|
+ synchronized (waitingEvents) {
|
|
|
+ if (isRunning) waitingEvents.add(packet);
|
|
|
+ else processEvent(packet);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ waitingEvents.add(packet);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
public void queueEventOfDeath() {
|
|
|
waitingEvents.add(eventOfDeath);
|
|
@@ -483,119 +493,131 @@ public class ClientCnxn {
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- try {
|
|
|
- while (true) {
|
|
|
- Object event = waitingEvents.take();
|
|
|
- try {
|
|
|
- if (event == eventOfDeath) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (event instanceof WatcherSetEventPair) {
|
|
|
- // each watcher will process the event
|
|
|
- WatcherSetEventPair pair = (WatcherSetEventPair) event;
|
|
|
- for (Watcher watcher : pair.watchers) {
|
|
|
- try {
|
|
|
- watcher.process(pair.event);
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.error("Error while calling watcher ", t);
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- Packet p = (Packet) event;
|
|
|
- int rc = 0;
|
|
|
- String clientPath = p.clientPath;
|
|
|
- if (p.replyHeader.getErr() != 0) {
|
|
|
- rc = p.replyHeader.getErr();
|
|
|
- }
|
|
|
- if (p.cb == null) {
|
|
|
- LOG.warn("Somehow a null cb got to EventThread!");
|
|
|
- } else if (p.response instanceof ExistsResponse
|
|
|
- || p.response instanceof SetDataResponse
|
|
|
- || p.response instanceof SetACLResponse) {
|
|
|
- StatCallback cb = (StatCallback) p.cb;
|
|
|
- if (rc == 0) {
|
|
|
- if (p.response instanceof ExistsResponse) {
|
|
|
- cb.processResult(rc, clientPath, p.ctx,
|
|
|
- ((ExistsResponse) p.response)
|
|
|
- .getStat());
|
|
|
- } else if (p.response instanceof SetDataResponse) {
|
|
|
- cb.processResult(rc, clientPath, p.ctx,
|
|
|
- ((SetDataResponse) p.response)
|
|
|
- .getStat());
|
|
|
- } else if (p.response instanceof SetACLResponse) {
|
|
|
- cb.processResult(rc, clientPath, p.ctx,
|
|
|
- ((SetACLResponse) p.response)
|
|
|
- .getStat());
|
|
|
- }
|
|
|
- } else {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, null);
|
|
|
- }
|
|
|
- } else if (p.response instanceof GetDataResponse) {
|
|
|
- DataCallback cb = (DataCallback) p.cb;
|
|
|
- GetDataResponse rsp = (GetDataResponse) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, rsp
|
|
|
- .getData(), rsp.getStat());
|
|
|
- } else {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, null,
|
|
|
- null);
|
|
|
- }
|
|
|
- } else if (p.response instanceof GetACLResponse) {
|
|
|
- ACLCallback cb = (ACLCallback) p.cb;
|
|
|
- GetACLResponse rsp = (GetACLResponse) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, rsp
|
|
|
- .getAcl(), rsp.getStat());
|
|
|
- } else {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, null,
|
|
|
- null);
|
|
|
- }
|
|
|
- } else if (p.response instanceof GetChildrenResponse) {
|
|
|
- ChildrenCallback cb = (ChildrenCallback) p.cb;
|
|
|
- GetChildrenResponse rsp = (GetChildrenResponse) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, rsp
|
|
|
- .getChildren());
|
|
|
- } else {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, null);
|
|
|
- }
|
|
|
- } else if (p.response instanceof GetChildren2Response) {
|
|
|
- Children2Callback cb = (Children2Callback) p.cb;
|
|
|
- GetChildren2Response rsp = (GetChildren2Response) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, rsp
|
|
|
- .getChildren(), rsp.getStat());
|
|
|
- } else {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, null, null);
|
|
|
- }
|
|
|
- } else if (p.response instanceof CreateResponse) {
|
|
|
- StringCallback cb = (StringCallback) p.cb;
|
|
|
- CreateResponse rsp = (CreateResponse) p.response;
|
|
|
- if (rc == 0) {
|
|
|
- cb.processResult(rc, clientPath, p.ctx,
|
|
|
- (chrootPath == null
|
|
|
- ? rsp.getPath()
|
|
|
- : rsp.getPath()
|
|
|
- .substring(chrootPath.length())));
|
|
|
- } else {
|
|
|
- cb.processResult(rc, clientPath, p.ctx, null);
|
|
|
- }
|
|
|
- } else if (p.cb instanceof VoidCallback) {
|
|
|
- VoidCallback cb = (VoidCallback) p.cb;
|
|
|
- cb.processResult(rc, clientPath, p.ctx);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.error("Caught unexpected throwable", t);
|
|
|
+ try {
|
|
|
+ isRunning = true;
|
|
|
+ while (true) {
|
|
|
+ Object event = waitingEvents.take();
|
|
|
+ if (event == eventOfDeath) {
|
|
|
+ wasKilled = true;
|
|
|
+ } else {
|
|
|
+ processEvent(event);
|
|
|
+ }
|
|
|
+ if (wasKilled)
|
|
|
+ synchronized (waitingEvents) {
|
|
|
+ if (waitingEvents.isEmpty()) {
|
|
|
+ isRunning = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOG.error("Event thread exiting due to interruption", e);
|
|
|
- }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.error("Event thread exiting due to interruption", e);
|
|
|
+ }
|
|
|
|
|
|
LOG.info("EventThread shut down");
|
|
|
}
|
|
|
+
|
|
|
+ private void processEvent(Object event) {
|
|
|
+ try {
|
|
|
+ if (event instanceof WatcherSetEventPair) {
|
|
|
+ // each watcher will process the event
|
|
|
+ WatcherSetEventPair pair = (WatcherSetEventPair) event;
|
|
|
+ for (Watcher watcher : pair.watchers) {
|
|
|
+ try {
|
|
|
+ watcher.process(pair.event);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error while calling watcher ", t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Packet p = (Packet) event;
|
|
|
+ int rc = 0;
|
|
|
+ String clientPath = p.clientPath;
|
|
|
+ if (p.replyHeader.getErr() != 0) {
|
|
|
+ rc = p.replyHeader.getErr();
|
|
|
+ }
|
|
|
+ if (p.cb == null) {
|
|
|
+ LOG.warn("Somehow a null cb got to EventThread!");
|
|
|
+ } else if (p.response instanceof ExistsResponse
|
|
|
+ || p.response instanceof SetDataResponse
|
|
|
+ || p.response instanceof SetACLResponse) {
|
|
|
+ StatCallback cb = (StatCallback) p.cb;
|
|
|
+ if (rc == 0) {
|
|
|
+ if (p.response instanceof ExistsResponse) {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx,
|
|
|
+ ((ExistsResponse) p.response)
|
|
|
+ .getStat());
|
|
|
+ } else if (p.response instanceof SetDataResponse) {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx,
|
|
|
+ ((SetDataResponse) p.response)
|
|
|
+ .getStat());
|
|
|
+ } else if (p.response instanceof SetACLResponse) {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx,
|
|
|
+ ((SetACLResponse) p.response)
|
|
|
+ .getStat());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof GetDataResponse) {
|
|
|
+ DataCallback cb = (DataCallback) p.cb;
|
|
|
+ GetDataResponse rsp = (GetDataResponse) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, rsp
|
|
|
+ .getData(), rsp.getStat());
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, null,
|
|
|
+ null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof GetACLResponse) {
|
|
|
+ ACLCallback cb = (ACLCallback) p.cb;
|
|
|
+ GetACLResponse rsp = (GetACLResponse) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, rsp
|
|
|
+ .getAcl(), rsp.getStat());
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, null,
|
|
|
+ null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof GetChildrenResponse) {
|
|
|
+ ChildrenCallback cb = (ChildrenCallback) p.cb;
|
|
|
+ GetChildrenResponse rsp = (GetChildrenResponse) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, rsp
|
|
|
+ .getChildren());
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof GetChildren2Response) {
|
|
|
+ Children2Callback cb = (Children2Callback) p.cb;
|
|
|
+ GetChildren2Response rsp = (GetChildren2Response) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, rsp
|
|
|
+ .getChildren(), rsp.getStat());
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, null, null);
|
|
|
+ }
|
|
|
+ } else if (p.response instanceof CreateResponse) {
|
|
|
+ StringCallback cb = (StringCallback) p.cb;
|
|
|
+ CreateResponse rsp = (CreateResponse) p.response;
|
|
|
+ if (rc == 0) {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx,
|
|
|
+ (chrootPath == null
|
|
|
+ ? rsp.getPath()
|
|
|
+ : rsp.getPath()
|
|
|
+ .substring(chrootPath.length())));
|
|
|
+ } else {
|
|
|
+ cb.processResult(rc, clientPath, p.ctx, null);
|
|
|
+ }
|
|
|
+ } else if (p.cb instanceof VoidCallback) {
|
|
|
+ VoidCallback cb = (VoidCallback) p.cb;
|
|
|
+ cb.processResult(rc, clientPath, p.ctx);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Caught unexpected throwable", t);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void finishPacket(Packet p) {
|
|
@@ -1243,9 +1265,9 @@ public class ClientCnxn {
|
|
|
}
|
|
|
|
|
|
public void close() {
|
|
|
- zooKeeper.state = States.CLOSED;
|
|
|
synchronized (this) {
|
|
|
- selector.wakeup();
|
|
|
+ zooKeeper.state = States.CLOSED;
|
|
|
+ selector.wakeup();
|
|
|
}
|
|
|
}
|
|
|
}
|