|
@@ -1233,9 +1233,11 @@ public class ClientCnxn {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- // When it comes to this point, it guarantees that later queued packet to outgoingQueue will be
|
|
|
- // notified of death.
|
|
|
- cleanup();
|
|
|
+ synchronized (state) {
|
|
|
+ // When it comes to this point, it guarantees that later queued
|
|
|
+ // packet to outgoingQueue will be notified of death.
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
clientCnxnSocket.close();
|
|
|
if (state.isAlive()) {
|
|
|
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
|
|
@@ -1515,15 +1517,21 @@ public class ClientCnxn {
|
|
|
packet.clientPath = clientPath;
|
|
|
packet.serverPath = serverPath;
|
|
|
packet.watchDeregistration = watchDeregistration;
|
|
|
- if (!state.isAlive() || closing) {
|
|
|
- conLossPacket(packet);
|
|
|
- } else {
|
|
|
- // If the client is asking to close the session then
|
|
|
- // mark as closing
|
|
|
- if (h.getType() == OpCode.closeSession) {
|
|
|
- closing = true;
|
|
|
+ // The synchronized block here is for two purpose:
|
|
|
+ // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
|
|
|
+ // 2. synchronized against each packet. So if a closeSession packet is added,
|
|
|
+ // later packet will be notified.
|
|
|
+ synchronized (state) {
|
|
|
+ if (!state.isAlive() || closing) {
|
|
|
+ conLossPacket(packet);
|
|
|
+ } else {
|
|
|
+ // If the client is asking to close the session then
|
|
|
+ // mark as closing
|
|
|
+ if (h.getType() == OpCode.closeSession) {
|
|
|
+ closing = true;
|
|
|
+ }
|
|
|
+ outgoingQueue.add(packet);
|
|
|
}
|
|
|
- outgoingQueue.add(packet);
|
|
|
}
|
|
|
sendThread.getClientCnxnSocket().packetAdded();
|
|
|
return packet;
|