Browse Source

ZOOKEEPER-3072: Throttle race condition fix

Making the throttle check before passing over the request to the next thread will prevent the possibility of throttling code running after unthrottle

Added an additional async hammer thread which is pretty reliably reproduces the race condition. The globalOutstandingLimit is decreased so throttling code is executed.

Author: Botond Hejj <botond.hejj@gmail.com>

Reviewers: Andor Molnár <andor@apache.org>, Norbert Kalmar <nkalmar@yahoo.com>, Benjamin Reed <breed@apache.org>

Closes #563 from bothejjms/ZOOKEEPER-3072
Botond Hejj 7 years ago
parent
commit
2a372fcdce
1 changed files with 14 additions and 16 deletions
  1. 14 16
      src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

+ 14 - 16
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -1130,24 +1130,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                 cnxn.disableRecv();
             }
             return;
+        } else if (h.getType() == OpCode.sasl) {
+            Record rsp = processSasl(incomingBuffer,cnxn);
+            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
+            cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
+            return;
         } else {
-            if (h.getType() == OpCode.sasl) {
-                Record rsp = processSasl(incomingBuffer,cnxn);
-                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
-                cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
-                return;
-            }
-            else {
-                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
-                  h.getType(), incomingBuffer, cnxn.getAuthInfo());
-                si.setOwner(ServerCnxn.me);
-                // Always treat packet from the client as a possible
-                // local request.
-                setLocalSessionFlag(si);
-                submitRequest(si);
-            }
+            cnxn.incrOutstandingRequests(h);
+            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
+              h.getType(), incomingBuffer, cnxn.getAuthInfo());
+            si.setOwner(ServerCnxn.me);
+            // Always treat packet from the client as a possible
+            // local request.
+            setLocalSessionFlag(si);
+            submitRequest(si);
+            return;
         }
-        cnxn.incrOutstandingRequests(h);
     }
 
     private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException {