瀏覽代碼

ZOOKEEPER-3652: Synchronize ClientCnxn outgoing queue flush on a stable internal value

When packets are added to ClientCnxn's outgoing packet queue we ensure there's no conflict with an ongoing flush of that queue because of connection loss.

Synchronization used to be on the state field's value. This value is both not stable (its value changes over time), possibly causing improper synchronization, and global, which can cause contention in applications that run several ZooKeeper clients.

We now synchronize on outgoingQueue which is both local to a ClientCnxn's instance and stable.

Author: Sylvain Wallez <sylvain@bluxte.net>

Reviewers: maoling <maoling@apache.org>, Mohammad Arshad <arshad@apache.org>

Closes #1257 from swallez/ZOOKEEPER-3652 and squashes the following commits:

82e2cad2c [Sylvain Wallez] Instruct SpotBugs that we know what we're doing when synchronizing on outgoingQueue
b0bc03d6f [Sylvain Wallez] ZOOKEEPER-3652: Synchronize ClientCnxn outgoing queue flush on a stable internal value
Sylvain Wallez 3 年之前
父節點
當前提交
91e0520133
共有 1 個文件被更改,包括 4 次插入2 次删除
  1. 4 2
      zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

+ 4 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -1175,6 +1175,7 @@ public class ClientCnxn {
         }
         }
 
 
         @Override
         @Override
+        @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
         public void run() {
         public void run() {
             clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
             clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
             clientCnxnSocket.updateNow();
             clientCnxnSocket.updateNow();
@@ -1303,7 +1304,7 @@ public class ClientCnxn {
                 }
                 }
             }
             }
 
 
-            synchronized (state) {
+            synchronized (outgoingQueue) {
                 // When it comes to this point, it guarantees that later queued
                 // When it comes to this point, it guarantees that later queued
                 // packet to outgoingQueue will be notified of death.
                 // packet to outgoingQueue will be notified of death.
                 cleanup();
                 cleanup();
@@ -1645,6 +1646,7 @@ public class ClientCnxn {
         return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null);
         return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null);
     }
     }
 
 
+    @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
     public Packet queuePacket(
     public Packet queuePacket(
         RequestHeader h,
         RequestHeader h,
         ReplyHeader r,
         ReplyHeader r,
@@ -1671,7 +1673,7 @@ public class ClientCnxn {
         // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
         // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
         // 2. synchronized against each packet. So if a closeSession packet is added,
         // 2. synchronized against each packet. So if a closeSession packet is added,
         // later packet will be notified.
         // later packet will be notified.
-        synchronized (state) {
+        synchronized (outgoingQueue) {
             if (!state.isAlive() || closing) {
             if (!state.isAlive() || closing) {
                 conLossPacket(packet);
                 conLossPacket(packet);
             } else {
             } else {