瀏覽代碼

ZOOKEEPER-3243: Add server-side request throttling

Author: Jie Huang <jiehuang@fb.com>
Author: Joseph Blomstedt <jdb@fb.com>

Reviewers: Michael Han <hanm@apache.org>

Closes #986 from jhuan31/ZOOKEEPER-3243
Jie Huang 6 年之前
父節點
當前提交
7b3de52cdb

+ 45 - 15
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -229,7 +229,7 @@ ensemble:
 7. If your configuration file is set up, you can start a
 7. If your configuration file is set up, you can start a
   ZooKeeper server:  
   ZooKeeper server:  
 
 
-        $ java -cp zookeeper.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.conf 
+        $ java -cp zookeeper.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.conf
 
 
   QuorumPeerMain starts a ZooKeeper server,
   QuorumPeerMain starts a ZooKeeper server,
   [JMX](http://java.sun.com/javase/technologies/core/mntr-mgmt/javamanagement/)
   [JMX](http://java.sun.com/javase/technologies/core/mntr-mgmt/javamanagement/)
@@ -833,7 +833,7 @@ property, when available, is noted below.
 
 
 * *serverCnxnFactory* :
 * *serverCnxnFactory* :
     (Java system property: **zookeeper.serverCnxnFactory**)
     (Java system property: **zookeeper.serverCnxnFactory**)
-    Specifies ServerCnxnFactory implementation. 
+    Specifies ServerCnxnFactory implementation.
     This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
     This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
     Default is `NIOServerCnxnFactory`.
     Default is `NIOServerCnxnFactory`.
 
 
@@ -857,6 +857,36 @@ property, when available, is noted below.
     Does not affect the limit defined by *flushDelay*.
     Does not affect the limit defined by *flushDelay*.
     Default is 1000.
     Default is 1000.
 
 
+* *requestThrottleLimit* :
+    (Java system property: **zookeeper.request_throttle_max_requests**)
+    **New in 3.6.0:**
+    The total number of outstanding requests allowed before the RequestThrottler starts stalling. When set to 0, throttling is disabled. The default is 0.
+
+* *requestThrottleStallTime* :
+    (Java system property: **zookeeper.request_throttle_stall_time**)
+    **New in 3.6.0:**
+    The maximum time (in milliseconds) for which a thread may wait to be notified that it may proceed processing a request. The default is 100.
+
+* *requestThrottleDropStale* :
+    (Java system property: **request_throttle_drop_stale**)
+    **New in 3.6.0:**
+    When enabled, the throttler will drop stale requests rather than issue them to the request pipeline. A stale request is a request sent by a connection that is now closed, and/or a request that will have a  request latency higher than the sessionTimeout. The default is true.
+
+* *requestStaleLatencyCheck* :
+    (Java system property: **zookeeper.request_stale_latency_check**)
+    **New in 3.6.0:**
+    When enabled, a request is considered stale if the request latency is higher than its associated session timeout. Disabled by default.
+
+* *requestStaleConnectionCheck* :
+    (Java system property: **zookeeper.request_stale_connection_check**)
+    **New in 3.6.0:**
+    When enabled, a request is considered stale if the request's connection has closed. Enabled by default.
+
+* *zookeeper.request_throttler.shutdownTimeout* :
+    (Java system property only)
+    **New in 3.6.0:**
+    The time (in milliseconds) the RequestThrottler waits for the request queue to drain during shutdown before it shuts down forcefully. The default is 10000.  
+
 <a name="sc_clusterOptions"></a>
 <a name="sc_clusterOptions"></a>
 
 
 #### Cluster Options
 #### Cluster Options
@@ -1108,20 +1138,20 @@ encryption/authentication/authorization performed by the service.
     (Java system property: **zookeeper.sslQuorum**)
     (Java system property: **zookeeper.sslQuorum**)
     **New in 3.5.5:**
     **New in 3.5.5:**
     Enables encrypted quorum communication. Default is `false`.
     Enables encrypted quorum communication. Default is `false`.
-       
+
 * *ssl.keyStore.location and ssl.keyStore.password* and *ssl.quorum.keyStore.location* and *ssl.quorum.keyStore.password* :
 * *ssl.keyStore.location and ssl.keyStore.password* and *ssl.quorum.keyStore.location* and *ssl.quorum.keyStore.password* :
     (Java system properties: **zookeeper.ssl.keyStore.location** and **zookeeper.ssl.keyStore.password** and **zookeeper.ssl.quorum.keyStore.location** and **zookeeper.ssl.quorum.keyStore.password**)
     (Java system properties: **zookeeper.ssl.keyStore.location** and **zookeeper.ssl.keyStore.password** and **zookeeper.ssl.quorum.keyStore.location** and **zookeeper.ssl.quorum.keyStore.password**)
     **New in 3.5.5:**
     **New in 3.5.5:**
     Specifies the file path to a Java keystore containing the local
     Specifies the file path to a Java keystore containing the local
     credentials to be used for client and quorum TLS connections, and the
     credentials to be used for client and quorum TLS connections, and the
     password to unlock the file.
     password to unlock the file.
-    
+
 * *ssl.keyStore.type* and *ssl.quorum.keyStore.type* :
 * *ssl.keyStore.type* and *ssl.quorum.keyStore.type* :
     (Java system properties: **zookeeper.ssl.keyStore.type** and **zookeeper.ssl.quorum.keyStore.type**)
     (Java system properties: **zookeeper.ssl.keyStore.type** and **zookeeper.ssl.quorum.keyStore.type**)
     **New in 3.5.5:**
     **New in 3.5.5:**
     Specifies the file format of client and quorum keystores. Values: JKS, PEM or null (detect by filename).    
     Specifies the file format of client and quorum keystores. Values: JKS, PEM or null (detect by filename).    
     Default: null     
     Default: null     
-    
+
 * *ssl.trustStore.location* and *ssl.trustStore.password* and *ssl.quorum.trustStore.location* and *ssl.quorum.trustStore.password* :
 * *ssl.trustStore.location* and *ssl.trustStore.password* and *ssl.quorum.trustStore.location* and *ssl.quorum.trustStore.password* :
     (Java system properties: **zookeeper.ssl.trustStore.location** and **zookeeper.ssl.trustStore.password** and **zookeeper.ssl.quorum.trustStore.location** and **zookeeper.ssl.quorum.trustStore.password**)
     (Java system properties: **zookeeper.ssl.trustStore.location** and **zookeeper.ssl.trustStore.password** and **zookeeper.ssl.quorum.trustStore.location** and **zookeeper.ssl.quorum.trustStore.password**)
     **New in 3.5.5:**
     **New in 3.5.5:**
@@ -1146,7 +1176,7 @@ encryption/authentication/authorization performed by the service.
     **New in 3.5.5:**
     **New in 3.5.5:**
     Specifies the enabled protocols in client and quorum TLS negotiation.
     Specifies the enabled protocols in client and quorum TLS negotiation.
     Default: value of `protocol` property
     Default: value of `protocol` property
-    
+
 * *ssl.ciphersuites* and *ssl.quorum.ciphersuites* :
 * *ssl.ciphersuites* and *ssl.quorum.ciphersuites* :
     (Java system properties: **zookeeper.ssl.ciphersuites** and **zookeeper.ssl.quorum.ciphersuites**)
     (Java system properties: **zookeeper.ssl.ciphersuites** and **zookeeper.ssl.quorum.ciphersuites**)
     **New in 3.5.5:**
     **New in 3.5.5:**
@@ -1161,7 +1191,7 @@ encryption/authentication/authorization performed by the service.
     1. Use hardware keystore, loaded in using PKCS11 or something similar.
     1. Use hardware keystore, loaded in using PKCS11 or something similar.
     2. You don't have access to the software keystore, but can retrieve an already-constructed SSLContext from their container.
     2. You don't have access to the software keystore, but can retrieve an already-constructed SSLContext from their container.
     Default: null
     Default: null
-    
+
 * *ssl.hostnameVerification* and *ssl.quorum.hostnameVerification* :
 * *ssl.hostnameVerification* and *ssl.quorum.hostnameVerification* :
     (Java system properties: **zookeeper.ssl.hostnameVerification** and **zookeeper.ssl.quorum.hostnameVerification**)
     (Java system properties: **zookeeper.ssl.hostnameVerification** and **zookeeper.ssl.quorum.hostnameVerification**)
     **New in 3.5.5:**
     **New in 3.5.5:**
@@ -1180,12 +1210,12 @@ encryption/authentication/authorization performed by the service.
     **New in 3.5.5:**
     **New in 3.5.5:**
     Specifies whether Online Certificate Status Protocol is enabled in client and quorum TLS protocols.
     Specifies whether Online Certificate Status Protocol is enabled in client and quorum TLS protocols.
     Default: false
     Default: false
-    
+
 * *ssl.clientAuth* and *ssl.quorum.clientAuth* :
 * *ssl.clientAuth* and *ssl.quorum.clientAuth* :
     (Java system properties: **zookeeper.ssl.clientAuth** and **zookeeper.ssl.quorum.clientAuth**)
     (Java system properties: **zookeeper.ssl.clientAuth** and **zookeeper.ssl.quorum.clientAuth**)
     **New in 3.5.5:**
     **New in 3.5.5:**
     TBD
     TBD
-    
+
 * *ssl.handshakeDetectionTimeoutMillis* and *ssl.quorum.handshakeDetectionTimeoutMillis* :
 * *ssl.handshakeDetectionTimeoutMillis* and *ssl.quorum.handshakeDetectionTimeoutMillis* :
     (Java system properties: **zookeeper.ssl.handshakeDetectionTimeoutMillis** and **zookeeper.ssl.quorum.handshakeDetectionTimeoutMillis**)
     (Java system properties: **zookeeper.ssl.handshakeDetectionTimeoutMillis** and **zookeeper.ssl.quorum.handshakeDetectionTimeoutMillis**)
     **New in 3.5.5:**
     **New in 3.5.5:**
@@ -1468,20 +1498,20 @@ and quorum communication protocols.
 
 
 One keystore should be created for each ZK instance.
 One keystore should be created for each ZK instance.
 
 
-In this example we generate a self-signed certificate and store it 
-together with the private key in `keystore.jks`. This is suitable for 
-testing purposes, but you probably need an official certificate to sign 
+In this example we generate a self-signed certificate and store it
+together with the private key in `keystore.jks`. This is suitable for
+testing purposes, but you probably need an official certificate to sign
 your keys in a production environment.
 your keys in a production environment.
 
 
 Please note that the alias (`-alias`) and the distinguished name (`-dname`)
 Please note that the alias (`-alias`) and the distinguished name (`-dname`)
-must match the hostname of the machine that is associated with, otherwise 
+must match the hostname of the machine that is associated with, otherwise
 hostname verification won't work.
 hostname verification won't work.
 
 
 ```
 ```
 keytool -genkeypair -alias $(hostname -f) -keyalg RSA -keysize 2048 -dname "cn=$(hostname -f)" -keypass password -keystore keystore.jks -storepass password
 keytool -genkeypair -alias $(hostname -f) -keyalg RSA -keysize 2048 -dname "cn=$(hostname -f)" -keypass password -keystore keystore.jks -storepass password
 ```
 ```
 
 
-2. Extract the signed public key (certificate) from keystore 
+2. Extract the signed public key (certificate) from keystore
 
 
 *This step might only necessary for self-signed certificates.*
 *This step might only necessary for self-signed certificates.*
 
 
@@ -1569,7 +1599,7 @@ and do another rolling restart
 ```
 ```
 sslQuorum=true
 sslQuorum=true
 portUnification=false
 portUnification=false
-``` 
+```
 
 
 
 
 <a name="sc_zkCommands"></a>
 <a name="sc_zkCommands"></a>

+ 5 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -217,6 +217,11 @@ public class FinalRequestProcessor implements RequestProcessor {
             if (LOG.isDebugEnabled()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("{}",request);
                 LOG.debug("{}",request);
             }
             }
+
+            if (request.isStale()) {
+                ServerMetrics.getMetrics().STALE_REPLIES.add(1);
+            }
+
             switch (request.type) {
             switch (request.type) {
             case OpCode.ping: {
             case OpCode.ping: {
                 lastOp = "PING";
                 lastOp = "PING";

+ 21 - 12
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -150,17 +150,31 @@ public class NIOServerCnxn extends ServerCnxn {
         requestInterestOpsUpdate();
         requestInterestOpsUpdate();
     }
     }
 
 
+    /**
+     * When read on socket failed, this is typically because client closed the
+     * connection. In most cases, the client does this when the server doesn't
+     * respond within 2/3 of session timeout. This possibly indicates server
+     * health/performance issue, so we need to log and keep track of stat
+     *
+     * @throws EndOfStreamException
+     */
+    private void handleFailedRead() throws EndOfStreamException {
+        setStale();
+        ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
+        throw new EndOfStreamException(
+                "Unable to read additional data from client,"
+                + " it probably closed the socket:"
+                + " address = " + sock.socket().getRemoteSocketAddress() + ","
+                + " session = 0x" + Long.toHexString(sessionId),
+                DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+    }
+
     /** Read the request payload (everything following the length prefix) */
     /** Read the request payload (everything following the length prefix) */
     private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
     private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
         if (incomingBuffer.remaining() != 0) { // have we read length bytes?
         if (incomingBuffer.remaining() != 0) { // have we read length bytes?
             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
             if (rc < 0) {
             if (rc < 0) {
-                ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
-                throw new EndOfStreamException(
-                        "Unable to read additional data from client sessionid 0x"
-                        + Long.toHexString(sessionId)
-                        + ", likely client has closed socket",
-                        DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+                handleFailedRead();
             }
             }
         }
         }
 
 
@@ -318,12 +332,7 @@ public class NIOServerCnxn extends ServerCnxn {
             if (k.isReadable()) {
             if (k.isReadable()) {
                 int rc = sock.read(incomingBuffer);
                 int rc = sock.read(incomingBuffer);
                 if (rc < 0) {
                 if (rc < 0) {
-                    ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
-                    throw new EndOfStreamException(
-                            "Unable to read additional data from client sessionid 0x"
-                            + Long.toHexString(sessionId)
-                            + ", likely client has closed socket",
-                            DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+                    handleFailedRead();
                 }
                 }
                 if (incomingBuffer.remaining() == 0) {
                 if (incomingBuffer.remaining() == 0) {
                     boolean isPayload;
                     boolean isPayload;

+ 69 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -39,6 +39,16 @@ import org.apache.zookeeper.txn.TxnHeader;
 public class Request {
 public class Request {
     public final static Request requestOfDeath = new Request(null, 0, 0, 0, null, null);
     public final static Request requestOfDeath = new Request(null, 0, 0, 0, null, null);
 
 
+    // Considers a request stale if the request's connection has closed. Enabled
+    // by default.
+    private static volatile boolean staleConnectionCheck = Boolean.parseBoolean(
+            System.getProperty("zookeeper.request_stale_connection_check","true"));
+
+    // Considers a request stale if the request latency is higher than its
+    // associated session timeout. Disabled by default.
+    private static volatile boolean staleLatencyCheck = Boolean.parseBoolean(
+            System.getProperty("zookeeper.request_stale_latency_check","false"));
+
     public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
     public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
         this.cnxn = cnxn;
         this.cnxn = cnxn;
         this.sessionId = sessionId;
         this.sessionId = sessionId;
@@ -133,6 +143,65 @@ public class Request {
         this.txn = txn;
         this.txn = txn;
     }
     }
 
 
+    public ServerCnxn getConnection() {
+        return cnxn;
+    }
+
+    public static boolean getStaleLatencyCheck() {
+        return staleLatencyCheck;
+    }
+
+    public static void setStaleLatencyCheck(boolean check) {
+        staleLatencyCheck = check;
+    }
+
+    public static boolean getStaleConnectionCheck() {
+        return staleConnectionCheck;
+    }
+
+    public static void setStaleConnectionCheck(boolean check) {
+        staleConnectionCheck = check;
+    }
+
+    public boolean isStale() {
+        if (cnxn == null) {
+            return false;
+        }
+
+        // closeSession requests should be able to outlive the session in order
+        // to clean-up state.
+        if (type == OpCode.closeSession) {
+            return false;
+        }
+
+        if (staleConnectionCheck) {
+            // If the connection is closed, consider the request stale.
+            if (cnxn.isStale() || cnxn.isInvalid()) {
+                return true;
+            }
+        }
+
+        if (staleLatencyCheck) {
+            // If the request latency is higher than session timeout, consider
+            // the request stale.
+            long currentTime = Time.currentElapsedTime();
+            if ((currentTime - createTime) > cnxn.getSessionTimeout()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * A prior request was dropped on this request's connection and
+     * therefore this request must also be dropped to ensure correct
+     * ordering semantics.
+     */
+    public boolean mustDrop() {
+        return ((cnxn != null) && cnxn.isInvalid());
+    }
+
     /**
     /**
      * is the packet type a valid packet in zookeeper
      * is the packet type a valid packet in zookeeper
      *
      *

+ 267 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java

@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.common.Time;
+
+/**
+ * When enabled, the RequestThrottler limits the number of outstanding requests
+ * currently submitted to the request processor pipeline. The throttler augments
+ * the limit imposed by the <code>globalOutstandingLimit</code> that is enforced
+ * by the connection layer ({@link NIOServerCnxn}, {@link NettyServerCnxn}).
+ *
+ * The connection layer limit applies backpressure against the TCP connection by
+ * disabling selection on connections once the request limit is reached. However,
+ * the connection layer always allows a connection to send at least one request
+ * before disabling selection on that connection. Thus, in a scenario with 40000
+ * client connections, the total number of requests inflight may be as high as
+ * 40000 even if the <code>globalOustandingLimit</code> was set lower.
+ *
+ * The RequestThrottler addresses this issue by adding additional queueing. When
+ * enabled, client connections no longer submit requests directly to the request
+ * processor pipeline but instead to the RequestThrottler. The RequestThrottler
+ * is then responsible for issuing requests to the request processors, and
+ * enforces a separate <code>maxRequests</code> limit. If the total number of
+ * outstanding requests is higher than <code>maxRequests</code>, the throttler
+ * will continually stall for <code>stallTime</code> milliseconds until
+ * underlimit.
+ *
+ * The RequestThrottler can also optionally drop stale requests rather than
+ * submit them to the processor pipeline. A stale request is a request sent
+ * by a connection that is already closed, and/or a request whose latency
+ * will end up being higher than its associated session timeout. The notion
+ * of staleness is configurable, @see Request for more details.
+ *
+ * To ensure ordering guarantees, if a request is ever dropped from a connection
+ * that connection is closed and flagged as invalid. All subsequent requests
+ * inflight from that connection are then dropped as well.
+ */
+public class RequestThrottler extends ZooKeeperCriticalThread {
+    private static final Logger LOG = LoggerFactory.getLogger(RequestThrottler.class);
+
+    private final LinkedBlockingQueue<Request> submittedRequests =
+        new LinkedBlockingQueue<Request>();
+
+    private final ZooKeeperServer zks;
+    private volatile boolean stopping;
+    private volatile boolean killed;
+
+    private static final String SHUTDOWN_TIMEOUT = "zookeeper.request_throttler.shutdownTimeout";
+    private static int shutdownTimeout = 10000;
+
+    static {
+        shutdownTimeout = Integer.getInteger(SHUTDOWN_TIMEOUT, 10000);
+        LOG.info("{} = {}", SHUTDOWN_TIMEOUT, shutdownTimeout);
+    }
+
+    /**
+     * The total number of outstanding requests allowed before the throttler
+     * starts stalling.
+     *
+     * When maxRequests = 0, throttling is disabled.
+     */
+    private static volatile int maxRequests =
+        Integer.getInteger("zookeeper.request_throttle_max_requests", 0);
+
+    /**
+     * The time (in milliseconds) this is the maximum time for which throttler
+     * thread may wait to be notified that it may proceed processing a request.
+     */
+    private static volatile int stallTime =
+        Integer.getInteger("zookeeper.request_throttle_stall_time", 100);
+
+    /**
+     * When true, the throttler will drop stale requests rather than issue
+     * them to the request pipeline. A stale request is a request sent by
+     * a connection that is now closed, and/or a request that will have a
+     * request latency higher than the sessionTimeout. The staleness of
+     * a request is tunable property, @see Request for details.
+     */
+    private static volatile boolean dropStaleRequests = Boolean.parseBoolean(
+            System.getProperty("zookeeper.request_throttle_drop_stale", "true"));
+
+    public RequestThrottler(ZooKeeperServer zks) {
+        super("RequestThrottler", zks.getZooKeeperServerListener());
+        this.zks = zks;
+        this.stopping = false;
+        this.killed = false;
+    }
+
+    public static int getMaxRequests() {
+        return maxRequests;
+    }
+
+    public static void setMaxRequests(int requests) {
+        maxRequests = requests;
+    }
+
+    public static int getStallTime() {
+        return stallTime;
+    }
+
+    public static void setStallTime(int time) {
+        stallTime = time;
+    }
+
+    public static boolean getDropStaleRequests() {
+        return dropStaleRequests;
+    }
+
+    public static void setDropStaleRequests(boolean drop) {
+        dropStaleRequests = drop;
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                if (killed) {
+                    break;
+                }
+
+                Request request = submittedRequests.take();
+                if (Request.requestOfDeath == request) {
+                    break;
+                }
+
+                if (request.mustDrop()) {
+                    continue;
+                }
+
+                // Throttling is disabled when maxRequests = 0
+                if (maxRequests > 0) {
+                    while (!killed) {
+                        if (dropStaleRequests && request.isStale()) {
+                            // Note: this will close the connection
+                            dropRequest(request);
+                            ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1);
+                            request = null;
+                            break;
+                        }
+                        if (zks.getInProcess() < maxRequests) {
+                            break;
+                        }
+                        throttleSleep(stallTime);
+                    }
+                }
+
+                if (killed) {
+                    break;
+                }
+
+                // A dropped stale request will be null
+                if (request != null) {
+                    if (request.isStale()) {
+                        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
+                    }
+                    zks.submitRequestNow(request);
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Unexpected interruption", e);
+        }
+        int dropped = drainQueue();
+        LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
+    }
+
+    private synchronized void throttleSleep(int stallTime) {
+        try {
+            ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1);
+            this.wait(stallTime);
+        } catch(InterruptedException ie) {
+            return;
+        }
+    }
+
+    @SuppressFBWarnings(value = "NN_NAKED_NOTIFY",
+            justification = "state change is in ZooKeeperServer.decInProgress() ")
+    public synchronized void throttleWake() {
+        this.notify();
+    }
+
+    private int drainQueue() {
+        // If the throttler shutdown gracefully, the queue will be empty.
+        // However, if the shutdown time limit was reached and the throttler
+        // was killed, we have no other option than to drop all remaining
+        // requests on the floor.
+        int dropped = 0;
+        Request request;
+        LOG.info("Draining request throttler queue");
+        while ((request = submittedRequests.poll()) != null) {
+            dropped += 1;
+            dropRequest(request);
+        }
+        return dropped;
+    }
+
+    private void dropRequest(Request request) {
+        // Since we're dropping a request on the floor, we must mark the
+        // connection as invalid to ensure any future requests from this
+        // connection are also dropped in order to ensure ordering
+        // semantics.
+        ServerCnxn conn = request.getConnection();
+        if (conn != null) {
+            // Note: this will close the connection
+            conn.setInvalid();
+        }
+    }
+
+    public void submitRequest(Request request) {
+        if (stopping) {
+            LOG.debug("Shutdown in progress. Request cannot be processed");
+            dropRequest(request);
+        } else {
+            submittedRequests.add(request);
+        }
+    }
+
+    public int getInflight() {
+        return submittedRequests.size();
+    }
+
+    @SuppressFBWarnings("DM_EXIT")
+    public void shutdown() {
+        // Try to shutdown gracefully
+        LOG.info("Shutting down");
+        stopping = true;
+        submittedRequests.add(Request.requestOfDeath);
+        try {
+            this.join(shutdownTimeout);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for {} to finish", this);
+        }
+
+        // Forcibly shutdown if necessary in order to ensure request
+        // queue is drained.
+        killed = true;
+        try {
+            this.join();
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for {} to finish", this);
+            //TODO apply ZOOKEEPER-575 and remove this line.
+            System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
+    }
+}

+ 29 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java

@@ -68,8 +68,6 @@ public abstract class ServerCnxn implements Stats, Watcher {
      */
      */
     boolean isOldClient = true;
     boolean isOldClient = true;
 
 
-    private volatile boolean stale = false;
-
     AtomicLong outstandingCount = new AtomicLong();
     AtomicLong outstandingCount = new AtomicLong();
 
 
     /** The ZooKeeperServer for this connection. May be null if the server
     /** The ZooKeeperServer for this connection. May be null if the server
@@ -123,6 +121,22 @@ public abstract class ServerCnxn implements Stats, Watcher {
         this.zkServer = zkServer;
         this.zkServer = zkServer;
     }
     }
 
 
+    /**
+     * Flag that indicates that this connection is known to be closed/closing
+     * and from which we can optionally ignore outstanding requests as part
+     * of request throttling. This flag may be false when a connection is
+     * actually closed (false negative), but should never be true with
+     * a connection is still alive (false positive).
+     */
+    private volatile boolean stale = false;
+
+    /**
+     * Flag that indicates that a request for this connection was previously
+     * dropped as part of request throttling and therefore all future requests
+     * must also be dropped to ensure ordering guarantees.
+     */
+    private volatile boolean invalid = false;
+
     abstract int getSessionTimeout();
     abstract int getSessionTimeout();
 
 
     public void incrOutstandingAndCheckThrottle(RequestHeader h) {
     public void incrOutstandingAndCheckThrottle(RequestHeader h) {
@@ -276,6 +290,19 @@ public abstract class ServerCnxn implements Stats, Watcher {
         stale = true;
         stale = true;
     }
     }
 
 
+    public boolean isInvalid() {
+        return invalid;
+    }
+
+    public void setInvalid() {
+        if (!invalid) {
+            if (!stale) {
+                sendCloseSession();
+            }
+            invalid = true;
+        }
+    }
+
     protected void packetReceived(long bytes) {
     protected void packetReceived(long bytes) {
         incrPacketsReceived();
         incrPacketsReceived();
         ServerStats serverStats = serverStats();
         ServerStats serverStats = serverStats();

+ 10 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -220,6 +220,11 @@ public final class ServerMetrics {
         ACK_LATENCY = metricsContext.getSummarySet("ack_latency", DetailLevel.ADVANCED);
         ACK_LATENCY = metricsContext.getSummarySet("ack_latency", DetailLevel.ADVANCED);
         PROPOSAL_COUNT = metricsContext.getCounter("proposal_count");
         PROPOSAL_COUNT = metricsContext.getCounter("proposal_count");
         QUIT_LEADING_DUE_TO_DISLOYAL_VOTER = metricsContext.getCounter("quit_leading_due_to_disloyal_voter");
         QUIT_LEADING_DUE_TO_DISLOYAL_VOTER = metricsContext.getCounter("quit_leading_due_to_disloyal_voter");
+
+        STALE_REQUESTS = metricsContext.getCounter("stale_requests");
+        STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
+        STALE_REPLIES = metricsContext.getCounter("stale_replies");
+        REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
     }
     }
 
 
     /**
     /**
@@ -414,6 +419,11 @@ public final class ServerMetrics {
      */
      */
     public final Counter ENSEMBLE_AUTH_SKIP;
     public final Counter ENSEMBLE_AUTH_SKIP;
 
 
+    public final Counter STALE_REQUESTS;
+    public final Counter STALE_REQUESTS_DROPPED;
+    public final Counter STALE_REPLIES;
+    public final Counter REQUEST_THROTTLE_WAIT_COUNT;
+
     private final MetricsProvider metricsProvider;
     private final MetricsProvider metricsProvider;
 
 
     public void resetAll() {
     public void resetAll() {

+ 60 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -40,6 +40,7 @@ import java.util.function.BiConsumer;
 
 
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslException;
 
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.jute.Record;
@@ -79,7 +80,6 @@ import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-
 /**
 /**
  * This class implements a simple standalone ZooKeeperServer. It sets up the
  * This class implements a simple standalone ZooKeeperServer. It sets up the
  * following chain of RequestProcessors to process requests:
  * following chain of RequestProcessors to process requests:
@@ -187,6 +187,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     // Connection throttling
     // Connection throttling
     private BlueThrottle connThrottle;
     private BlueThrottle connThrottle;
 
 
+    @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC",
+            justification = "Internally the throttler has a BlockingQueue so " +
+                    "once the throttler is created and started, it is thread-safe")
+    private RequestThrottler requestThrottler;
+
     void removeCnxn(ServerCnxn cnxn) {
     void removeCnxn(ServerCnxn cnxn) {
         zkDb.removeCnxn(cnxn);
         zkDb.removeCnxn(cnxn);
     }
     }
@@ -250,7 +255,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                            ZKDatabase zkDb, String initialConfig) {
                            ZKDatabase zkDb, String initialConfig) {
         this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig);
         this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig);
         this.jvmPauseMonitor = jvmPauseMonitor;
         this.jvmPauseMonitor = jvmPauseMonitor;
-        if(jvmPauseMonitor != null) {
+        if (jvmPauseMonitor != null) {
             LOG.info("Added JvmPauseMonitor to server");
             LOG.info("Added JvmPauseMonitor to server");
         }
         }
     }
     }
@@ -558,6 +563,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         startSessionTracker();
         startSessionTracker();
         setupRequestProcessors();
         setupRequestProcessors();
 
 
+        startRequestThrottler();
+
         registerJMX();
         registerJMX();
 
 
         startJvmPauseMonitor();
         startJvmPauseMonitor();
@@ -574,6 +581,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
         }
     }
     }
 
 
+    protected void startRequestThrottler() {
+        requestThrottler = new RequestThrottler(this);
+        requestThrottler.start();
+
+    }
+
     protected void setupRequestProcessors() {
     protected void setupRequestProcessors() {
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
         RequestProcessor syncProcessor = new SyncRequestProcessor(this,
         RequestProcessor syncProcessor = new SyncRequestProcessor(this,
@@ -674,6 +687,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         // subclasses will do their specific clean up
         // subclasses will do their specific clean up
         unregisterMetrics();
         unregisterMetrics();
 
 
+        if (requestThrottler != null) {
+            requestThrottler.shutdown();
+        }
+
         // Since sessionTracker and syncThreads poll we just have to
         // Since sessionTracker and syncThreads poll we just have to
         // set running to false and they will detect it during the poll
         // set running to false and they will detect it during the poll
         // interval.
         // interval.
@@ -735,12 +752,26 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
 
     public void decInProcess() {
     public void decInProcess() {
         requestsInProcess.decrementAndGet();
         requestsInProcess.decrementAndGet();
+        if (requestThrottler != null) {
+            requestThrottler.throttleWake();
+        }
     }
     }
 
 
     public int getInProcess() {
     public int getInProcess() {
         return requestsInProcess.get();
         return requestsInProcess.get();
     }
     }
 
 
+    public int getInflight() {
+        return requestThrottleInflight();
+    }
+
+    private int requestThrottleInflight() {
+        if (requestThrottler != null) {
+            return requestThrottler.getInflight();
+        }
+        return 0;
+    }
+
     /**
     /**
      * This structure is used to facilitate information sharing between PrepRP
      * This structure is used to facilitate information sharing between PrepRP
      * and FinalRP.
      * and FinalRP.
@@ -910,6 +941,32 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
     }
 
 
     public void submitRequest(Request si) {
     public void submitRequest(Request si) {
+        enqueueRequest(si);
+    }
+
+    public void enqueueRequest(Request si) {
+        if (requestThrottler == null) {
+            synchronized (this) {
+                try {
+                    // Since all requests are passed to the request
+                    // processor it should wait for setting up the request
+                    // processor chain. The state will be updated to RUNNING
+                    // after the setup.
+                    while (state == State.INITIAL) {
+                        wait(1000);
+                    }
+                } catch (InterruptedException e) {
+                    LOG.warn("Unexpected interruption", e);
+                }
+                if (requestThrottler == null) {
+                    throw new RuntimeException("Not started");
+                }
+            }
+        }
+        requestThrottler.submitRequest(si);
+    }
+
+    public void submitRequestNow(Request si) {
         if (firstProcessor == null) {
         if (firstProcessor == null) {
             synchronized (this) {
             synchronized (this) {
                 try {
                 try {
@@ -1224,7 +1281,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
     }
 
 
     public boolean shouldThrottle(long outStandingCount) {
     public boolean shouldThrottle(long outStandingCount) {
-        if (getGlobalOutstandingLimit() < getInProcess()) {
+        if (getGlobalOutstandingLimit() < getInflight()) {
             return outStandingCount > 0;
             return outStandingCount > 0;
         }
         }
         return false;
         return false;

+ 47 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java

@@ -291,6 +291,37 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
         ZooKeeperServer.setFlushDelay(delay);
         ZooKeeperServer.setFlushDelay(delay);
     }
     }
 
 
+    // Request throttling settings
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getRequestThrottleLimit() {
+        return RequestThrottler.getMaxRequests();
+    }
+
+    public void setRequestThrottleLimit(int requests) {
+        RequestThrottler.setMaxRequests(requests);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getRequestThrottleStallTime() {
+        return RequestThrottler.getStallTime();
+    }
+
+    public void setRequestThrottleStallTime(int time) {
+        RequestThrottler.setStallTime(time);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public boolean getRequestThrottleDropStale() {
+        return RequestThrottler.getDropStaleRequests();
+    }
+
+    public void setRequestThrottleDropStale(boolean drop) {
+        RequestThrottler.setDropStaleRequests(drop);
+    }
+
     ///////////////////////////////////////////////////////////////////////////
     ///////////////////////////////////////////////////////////////////////////
 
 
     @Override
     @Override
@@ -303,6 +334,14 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
         ZooKeeperServer.setMaxWriteQueuePollTime(delay);
         ZooKeeperServer.setMaxWriteQueuePollTime(delay);
     }
     }
 
 
+    public boolean getRequestStaleLatencyCheck() {
+        return Request.getStaleLatencyCheck();
+    }
+
+    public void setRequestStaleLatencyCheck(boolean check) {
+        Request.setStaleLatencyCheck(check);
+    }
+
     ///////////////////////////////////////////////////////////////////////////
     ///////////////////////////////////////////////////////////////////////////
 
 
     @Override
     @Override
@@ -314,4 +353,12 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
     public void setMaxBatchSize(int size) {
     public void setMaxBatchSize(int size) {
         ZooKeeperServer.setMaxBatchSize(size);
         ZooKeeperServer.setMaxBatchSize(size);
     }
     }
+
+    public boolean getRequestStaleConnectionCheck() {
+        return Request.getStaleConnectionCheck();
+    }
+
+    public void setRequestStaleConnectionCheck(boolean check) {
+        Request.setStaleConnectionCheck(check);
+    }
 }
 }

+ 15 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java

@@ -120,6 +120,21 @@ public interface ZooKeeperServerMXBean {
     public double getConnectionDecreaseRatio();
     public double getConnectionDecreaseRatio();
     public void setConnectionDecreaseRatio(double val);
     public void setConnectionDecreaseRatio(double val);
 
 
+    public int getRequestThrottleLimit();
+    public void setRequestThrottleLimit(int requests);
+
+    public int getRequestThrottleStallTime();
+    public void setRequestThrottleStallTime(int time);
+
+    public boolean getRequestThrottleDropStale();
+    public void setRequestThrottleDropStale(boolean drop);
+
+    public boolean getRequestStaleLatencyCheck();
+    public void setRequestStaleLatencyCheck(boolean check);
+
+    public boolean getRequestStaleConnectionCheck();
+    public void setRequestStaleConnectionCheck(boolean check);
+
     /**
     /**
      * Reset packet and latency statistics 
      * Reset packet and latency statistics 
      */
      */

+ 242 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java

@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+
+public class RequestThrottlerTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(RequestThrottlerTest.class);
+
+    private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
+    private final static int TOTAL_REQUESTS = 5;
+    private final static int STALL_TIME = 5000;
+
+    // latch to hold requests in the PrepRequestProcessor to
+    // keep them from going down the pipeline to reach the final
+    // request processor, where the number of in process requests
+    // will be decreased
+    CountDownLatch resumeProcess = null;
+
+    // latch to make sure all requests are submitted
+    CountDownLatch submitted = null;
+
+    // latch to make sure all requests entered the pipeline
+    CountDownLatch entered = null;
+
+    ZooKeeperServer zks = null;
+    ServerCnxnFactory f = null;
+    ZooKeeper zk = null;
+
+    @Before
+    public void setup() throws Exception {
+        // start a server and create a client
+        File tmpDir = ClientBase.createTmpDir();
+        ClientBase.setupTestEnv();
+        zks = new TestZooKeeperServer(tmpDir, tmpDir, 3000);
+        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+        f = ServerCnxnFactory.createFactory(PORT, -1);
+        f.startup(zks);
+        LOG.info("starting up the zookeeper server .. waiting");
+        Assert.assertTrue("waiting for server being up",
+                ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+
+        resumeProcess = null;
+        submitted = null;
+
+        zk = ClientBase.createZKClient(HOSTPORT);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // shut down the server and the client
+        if (null != zk) {
+            zk.close();
+        }
+
+        if (null != f) {
+            f.shutdown();
+        }
+        if (null != zks) {
+            zks.shutdown();
+        }
+    }
+
+    // TestZooKeeperServer
+    // 1. uses our version of PrepRequestProcessor, which can hold the request as long as we want
+    // 2. count the number of submitted requests
+    class TestZooKeeperServer extends ZooKeeperServer {
+        public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
+            super(snapDir, logDir, tickTime);
+        }
+
+        @Override
+        protected void setupRequestProcessors() {
+            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+            RequestProcessor syncProcessor = new SyncRequestProcessor(this,
+                    finalProcessor);
+            ((SyncRequestProcessor) syncProcessor).start();
+            firstProcessor = new TestPrepRequestProcessor(this, syncProcessor);
+            ((TestPrepRequestProcessor) firstProcessor).start();
+        }
+
+        @Override
+        public void submitRequest(Request si) {
+            if (null != submitted) {
+                submitted.countDown();
+            }
+            super.submitRequest(si);
+        }
+    }
+
+    class TestPrepRequestProcessor extends PrepRequestProcessor {
+        public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) {
+            super(zks, syncProcessor);
+        }
+
+        @Override
+        protected void pRequest(Request request) throws RequestProcessorException {
+            // keep the request in the processor as long as we want
+            if (resumeProcess != null) {
+                try {
+                    resumeProcess.await(20, TimeUnit.SECONDS);
+                } catch (Exception e) {
+
+                }
+            }
+
+            if (entered != null) {
+                entered.countDown();
+            }
+
+            super.pRequest(request);
+        }
+    }
+
+    @Test
+    public void testRequestThrottler() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
+
+        // we only allow two requests in the pipeline
+        RequestThrottler.setMaxRequests(2);
+
+        RequestThrottler.setStallTime(STALL_TIME);
+        RequestThrottler.setDropStaleRequests(false);
+
+        // no requests can go through the pipeline unless we raise the latch
+        resumeProcess = new CountDownLatch(1);
+        submitted = new CountDownLatch(TOTAL_REQUESTS);
+        entered = new CountDownLatch(TOTAL_REQUESTS);
+
+        // send 5 requests asynchronously
+        for (int i =0; i < TOTAL_REQUESTS; i++) {
+            zk.create("/request_throttle_test- " + i , ("/request_throttle_test- " + i).getBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
+        }
+
+        // make sure the server received all 5 requests
+        submitted.await(5, TimeUnit.SECONDS);
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+
+        // but only two requests can get into the pipeline because of the throttler
+        Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+        Assert.assertEquals(1L, (long)metrics.get("request_throttle_wait_count"));
+
+        // let the requests go through the pipeline and the throttler will be waken up to allow more requests
+        // to enter the pipeline
+        resumeProcess.countDown();
+        entered.await(STALL_TIME, TimeUnit.MILLISECONDS);
+
+        metrics = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(TOTAL_REQUESTS, (long)metrics.get("prep_processor_request_queued"));
+    }
+
+    @Test
+    public void testDropStaleRequests() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
+
+        // we only allow two requests in the pipeline
+        RequestThrottler.setMaxRequests(2);
+
+        RequestThrottler.setStallTime(STALL_TIME);
+
+        RequestThrottler.setDropStaleRequests(true);
+
+        // no requests can go through the pipeline unless we raise the latch
+        resumeProcess = new CountDownLatch(1);
+        submitted = new CountDownLatch(TOTAL_REQUESTS);
+
+        // send 5 requests asynchronously
+        for (int i=0; i<TOTAL_REQUESTS; i++) {
+            zk.create("/request_throttle_test- " + i , ("/request_throttle_test- " + i).getBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
+        }
+
+        // make sure the server received all 5 requests
+        submitted.await(5, TimeUnit.SECONDS);
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+
+        // but only two requests can get into the pipeline because of the throttler
+        Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+        Assert.assertEquals(1L, (long)metrics.get("request_throttle_wait_count"));
+
+        for (ServerCnxn cnxn : f.cnxns){
+            cnxn.setStale();
+        }
+        zk = null;
+
+        resumeProcess.countDown();
+        LOG.info("raise the latch");
+
+        while (zks.getInflight() > 0) {
+            Thread.sleep(50);
+        }
+
+        // the rest of the 3 requests will be dropped
+        // but only the first one for a connection will be counted
+        metrics = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+        Assert.assertEquals(1, (long)metrics.get("stale_requests_dropped"));
+    }
+}

+ 4 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java

@@ -31,6 +31,8 @@ import org.apache.zookeeper.server.SessionTrackerImpl.SessionImpl;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
 /**
  * Testing zk client session logic in sessiontracker
  * Testing zk client session logic in sessiontracker
@@ -48,6 +50,7 @@ public class SessionTrackerTest extends ZKTestCase {
      */
      */
     @Test(timeout = 20000)
     @Test(timeout = 20000)
     public void testAddSessionAfterSessionExpiry() throws Exception {
     public void testAddSessionAfterSessionExpiry() throws Exception {
+        RequestThrottler.setMaxRequests(0);
         ZooKeeperServer zks = setupSessionTracker();
         ZooKeeperServer zks = setupSessionTracker();
 
 
         latch = new CountDownLatch(1);
         latch = new CountDownLatch(1);
@@ -127,6 +130,7 @@ public class SessionTrackerTest extends ZKTestCase {
         // setup session tracker
         // setup session tracker
         zks.createSessionTracker();
         zks.createSessionTracker();
         zks.startSessionTracker();
         zks.startSessionTracker();
+        zks.startRequestThrottler();
         return zks;
         return zks;
     }
     }