Преглед изворни кода

ZOOKEEPER-3683: Discard requests that are delayed longer than a confi…

…gured threshold

Author: Jie Huang <jiehuang@fb.com>
Author: Ivailo Nedelchev <nedelchev@fb.com>

Reviewers: Michael Han <hanm@apache.org>, Allan Lyu <fangmin@apache.org>, Damien Diederen <dd@crosstwine.com>

Closes #1211 from jhuan31/ZOOKEEPER-3683
Jie Huang пре 5 година
родитељ
комит
e87bad6774
24 измењених фајлова са 695 додато и 24 уклоњено
  1. 2 1
      zookeeper-client/zookeeper-client-c/include/zookeeper.h
  2. 2 0
      zookeeper-client/zookeeper-client-c/src/zookeeper.c
  3. 9 0
      zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
  4. 17 1
      zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
  5. 26 13
      zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
  6. 15 3
      zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
  7. 18 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
  8. 15 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
  9. 7 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
  10. 3 4
      zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
  11. 12 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
  12. 10 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
  13. 3 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
  14. 16 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
  15. 3 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
  16. 6 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
  17. 4 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
  18. 4 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
  19. 4 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
  20. 60 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
  21. 224 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpHelper.java
  22. 73 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
  23. 98 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpQuorumTest.java
  24. 64 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpStandaloneTest.java

+ 2 - 1
zookeeper-client/zookeeper-client-c/include/zookeeper.h

@@ -140,7 +140,8 @@ enum ZOO_ERRORS {
   ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a local session */
   ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a local session */
   ZNOWATCHER = -121, /*!< The watcher couldn't be found */
   ZNOWATCHER = -121, /*!< The watcher couldn't be found */
   ZRECONFIGDISABLED = -123, /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */
   ZRECONFIGDISABLED = -123, /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */
-  ZSESSIONCLOSEDREQUIRESASLAUTH = -124 /*!< The session has been closed by server because server requires client to do SASL authentication, but client is not configured with SASL authentication or configuted with SASL but failed (i.e. wrong credential used.). */
+  ZSESSIONCLOSEDREQUIRESASLAUTH = -124, /*!< The session has been closed by server because server requires client to do SASL authentication, but client is not configured with SASL authentication or configuted with SASL but failed (i.e. wrong credential used.). */
+  ZTHROTTLEDOP = -127 /*!< Operation was throttled and not executed at all. please, retry! */
 };
 };
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus

+ 2 - 0
zookeeper-client/zookeeper-client-c/src/zookeeper.c

@@ -4902,6 +4902,8 @@ const char* zerror(int c)
       return "the watcher couldn't be found";
       return "the watcher couldn't be found";
     case ZRECONFIGDISABLED:
     case ZRECONFIGDISABLED:
       return "attempts to perform a reconfiguration operation when reconfiguration feature is disable";
       return "attempts to perform a reconfiguration operation when reconfiguration feature is disable";
+   case ZTHROTTLEDOP:
+     return "Operation was throttled due to high load";
     }
     }
     if (c > 0) {
     if (c > 0) {
       return strerror(c);
       return strerror(c);

+ 9 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -1133,6 +1133,15 @@ property, when available, is noted below.
     effect due to TLS handshake timeout when there are too many in-flight TLS 
     effect due to TLS handshake timeout when there are too many in-flight TLS 
     handshakes. Set it to something like 250 is good enough to avoid herd effect.
     handshakes. Set it to something like 250 is good enough to avoid herd effect.
 
 
+* *throttledOpWaitTime*
+    (Java system property: **zookeeper.throttled_op_wait_time**)
+    The time in the RequestThrottler queue longer than which a request will be marked as throttled.
+    A throttled requests will not be processed other than being fed down the pipeline of the server it belongs to
+    to preserve the order of all requests.
+    The FinalProcessor will issue an error response (new error code: ZTHROTTLEDOP) for these undigested requests.
+    The intent is for the clients not to retry them immediately.
+    When set to 0, no requests will be throttled. The default is 0.
+
 <a name="sc_clusterOptions"></a>
 <a name="sc_clusterOptions"></a>
 
 
 #### Cluster Options
 #### Cluster Options

+ 17 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java

@@ -148,6 +148,8 @@ public abstract class KeeperException extends Exception {
             return new SessionClosedRequireAuthException();
             return new SessionClosedRequireAuthException();
         case REQUESTTIMEOUT:
         case REQUESTTIMEOUT:
             return new RequestTimeoutException();
             return new RequestTimeoutException();
+        case THROTTLEDOP:
+            return new ThrottledOpException();
         case OK:
         case OK:
         default:
         default:
             throw new IllegalArgumentException("Invalid exception code");
             throw new IllegalArgumentException("Invalid exception code");
@@ -404,7 +406,11 @@ public abstract class KeeperException extends Exception {
         /** The session has been closed by server because server requires client to do SASL authentication,
         /** The session has been closed by server because server requires client to do SASL authentication,
          *  but client is not configured with SASL authentication or configuted with SASL but failed
          *  but client is not configured with SASL authentication or configuted with SASL but failed
          *  (i.e. wrong credential used.). */
          *  (i.e. wrong credential used.). */
-        SESSIONCLOSEDREQUIRESASLAUTH(-124);
+        SESSIONCLOSEDREQUIRESASLAUTH(-124),
+        /** Operation was throttled and not executed at all. This error code indicates that zookeeper server
+         *  is under heavy load and can't process incoming requests at full speed; please retry with back off.
+         */
+        THROTTLEDOP (-127);
 
 
         private static final Map<Integer, Code> lookup = new HashMap<Integer, Code>();
         private static final Map<Integer, Code> lookup = new HashMap<Integer, Code>();
 
 
@@ -495,6 +501,8 @@ public abstract class KeeperException extends Exception {
             return "Reconfig is disabled";
             return "Reconfig is disabled";
         case SESSIONCLOSEDREQUIRESASLAUTH:
         case SESSIONCLOSEDREQUIRESASLAUTH:
             return "Session closed because client failed to authenticate";
             return "Session closed because client failed to authenticate";
+        case THROTTLEDOP:
+            return "Op throttled due to high load";
         default:
         default:
             return "Unknown error " + code;
             return "Unknown error " + code;
         }
         }
@@ -940,4 +948,12 @@ public abstract class KeeperException extends Exception {
 
 
     }
     }
 
 
+    /**
+     * @see Code#THROTTLEDOP
+     */
+    public static class ThrottledOpException extends KeeperException {
+        public ThrottledOpException() {
+            super(Code.THROTTLEDOP);
+        }
+    }
 }
 }

+ 26 - 13
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -106,18 +106,7 @@ public class FinalRequestProcessor implements RequestProcessor {
         this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();
         this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();
     }
     }
 
 
-    public void processRequest(Request request) {
-        LOG.debug("Processing request:: {}", request);
-
-        // request.addRQRec(">final");
-        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
-        if (request.type == OpCode.ping) {
-            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-        }
-        if (LOG.isTraceEnabled()) {
-            ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
-        }
-
+    private ProcessTxnResult applyRequest(Request request) {
         ProcessTxnResult rc = zks.processTxn(request);
         ProcessTxnResult rc = zks.processTxn(request);
 
 
         // ZOOKEEPER-558:
         // ZOOKEEPER-558:
@@ -131,7 +120,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             // we are just playing diffs from the leader.
             // we are just playing diffs from the leader.
             if (closeSession(zks.serverCnxnFactory, request.sessionId)
             if (closeSession(zks.serverCnxnFactory, request.sessionId)
                 || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
                 || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
-                return;
+                return rc;
             }
             }
         }
         }
 
 
@@ -150,6 +139,24 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             }
         }
         }
 
 
+        return rc;
+    }
+
+    public void processRequest(Request request) {
+        LOG.debug("Processing request:: {}", request);
+
+        // request.addRQRec(">final");
+        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
+        if (request.type == OpCode.ping) {
+            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+        }
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
+        }
+        ProcessTxnResult rc = null;
+        if (!request.isThrottled()) {
+          rc = applyRequest(request);
+        }
         if (request.cnxn == null) {
         if (request.cnxn == null) {
             return;
             return;
         }
         }
@@ -195,7 +202,13 @@ public class FinalRequestProcessor implements RequestProcessor {
             if (request.isStale()) {
             if (request.isStale()) {
                 ServerMetrics.getMetrics().STALE_REPLIES.add(1);
                 ServerMetrics.getMetrics().STALE_REPLIES.add(1);
             }
             }
+
+            if (request.isThrottled()) {
+              throw KeeperException.create(Code.THROTTLEDOP);
+            }
+
             AuditHelper.addAuditLog(request, rc);
             AuditHelper.addAuditLog(request, rc);
+
             switch (request.type) {
             switch (request.type) {
             case OpCode.ping: {
             case OpCode.ping: {
                 lastOp = "PING";
                 lastOp = "PING";

+ 15 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -765,6 +765,21 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         request.setHdr(null);
         request.setHdr(null);
         request.setTxn(null);
         request.setTxn(null);
 
 
+        if (!request.isThrottled()) {
+          pRequestHelper(request);
+        }
+
+        request.zxid = zks.getZxid();
+        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
+        nextProcessor.processRequest(request);
+    }
+
+    /**
+     * This method is a helper to pRequest method
+     *
+     * @param request
+     */
+    private void pRequestHelper(Request request) throws RequestProcessorException {
         try {
         try {
             switch (request.type) {
             switch (request.type) {
             case OpCode.createContainer:
             case OpCode.createContainer:
@@ -939,9 +954,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                 request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
                 request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
             }
             }
         }
         }
-        request.zxid = zks.getZxid();
-        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
-        nextProcessor.processRequest(request);
     }
     }
 
 
     private static List<ACL> removeDuplicates(final List<ACL> acls) {
     private static List<ACL> removeDuplicates(final List<ACL> acls) {

+ 18 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -100,6 +100,8 @@ public class Request {
 
 
     public long syncQueueStartTime;
     public long syncQueueStartTime;
 
 
+    public long requestThrottleQueueTime;
+
     private Object owner;
     private Object owner;
 
 
     private KeeperException e;
     private KeeperException e;
@@ -108,6 +110,22 @@ public class Request {
 
 
     private TxnDigest txnDigest;
     private TxnDigest txnDigest;
 
 
+    private boolean isThrottledFlag = false;
+
+    public boolean isThrottled() {
+      return isThrottledFlag;
+    }
+
+    public void setIsThrottled(boolean val) {
+      isThrottledFlag = val;
+    }
+
+    public boolean isThrottlable() {
+        return this.type != OpCode.ping
+                && this.type != OpCode.closeSession
+                && this.type != OpCode.createSession;
+    }
+
     /**
     /**
      * If this is a create or close request for a local-only session.
      * If this is a create or close request for a local-only session.
      */
      */

+ 15 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java

@@ -20,6 +20,7 @@ package org.apache.zookeeper.server;
 
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -97,6 +98,13 @@ public class RequestThrottler extends ZooKeeperCriticalThread {
      */
      */
     private static volatile boolean dropStaleRequests = Boolean.parseBoolean(System.getProperty("zookeeper.request_throttle_drop_stale", "true"));
     private static volatile boolean dropStaleRequests = Boolean.parseBoolean(System.getProperty("zookeeper.request_throttle_drop_stale", "true"));
 
 
+    protected boolean shouldThrottleOp(Request request, long elapsedTime) {
+        return request.isThrottlable()
+                && zks.getThrottledOpWaitTime() > 0
+                && elapsedTime > zks.getThrottledOpWaitTime();
+    }
+
+
     public RequestThrottler(ZooKeeperServer zks) {
     public RequestThrottler(ZooKeeperServer zks) {
         super("RequestThrottler", zks.getZooKeeperServerListener());
         super("RequestThrottler", zks.getZooKeeperServerListener());
         this.zks = zks;
         this.zks = zks;
@@ -171,6 +179,12 @@ public class RequestThrottler extends ZooKeeperCriticalThread {
                     if (request.isStale()) {
                     if (request.isStale()) {
                         ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
                         ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
                     }
                     }
+                    final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
+                    ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
+                    if (shouldThrottleOp(request, elapsedTime)) {
+                      request.setIsThrottled(true);
+                      ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
+                    }
                     zks.submitRequestNow(request);
                     zks.submitRequestNow(request);
                 }
                 }
             }
             }
@@ -230,6 +244,7 @@ public class RequestThrottler extends ZooKeeperCriticalThread {
             LOG.debug("Shutdown in progress. Request cannot be processed");
             LOG.debug("Shutdown in progress. Request cannot be processed");
             dropRequest(request);
             dropRequest(request);
         } else {
         } else {
+            request.requestThrottleQueueTime = Time.currentElapsedTime();
             submittedRequests.add(request);
             submittedRequests.add(request);
         }
         }
     }
     }
@@ -238,7 +253,6 @@ public class RequestThrottler extends ZooKeeperCriticalThread {
         return submittedRequests.size();
         return submittedRequests.size();
     }
     }
 
 
-    @SuppressFBWarnings("DM_EXIT")
     public void shutdown() {
     public void shutdown() {
         // Try to shutdown gracefully
         // Try to shutdown gracefully
         LOG.info("Shutting down");
         LOG.info("Shutting down");

+ 7 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -154,6 +154,8 @@ public final class ServerMetrics {
         READS_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC);
         READS_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC);
         WRITES_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC);
         WRITES_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC);
 
 
+        THROTTLED_OPS = metricsContext.getCounter("throttled_ops");
+
         /**
         /**
          * Time spent by a read request in the commit processor.
          * Time spent by a read request in the commit processor.
          */
          */
@@ -223,6 +225,7 @@ public final class ServerMetrics {
         STALE_REQUESTS = metricsContext.getCounter("stale_requests");
         STALE_REQUESTS = metricsContext.getCounter("stale_requests");
         STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
         STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
         STALE_REPLIES = metricsContext.getCounter("stale_replies");
         STALE_REPLIES = metricsContext.getCounter("stale_replies");
+        REQUEST_THROTTLE_QUEUE_TIME = metricsContext.getSummary("request_throttle_queue_time_ms", DetailLevel.ADVANCED);
         REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
         REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
         LARGE_REQUESTS_REJECTED = metricsContext.getCounter("large_requests_rejected");
         LARGE_REQUESTS_REJECTED = metricsContext.getCounter("large_requests_rejected");
 
 
@@ -381,6 +384,9 @@ public final class ServerMetrics {
     public final Summary READS_ISSUED_IN_COMMIT_PROC;
     public final Summary READS_ISSUED_IN_COMMIT_PROC;
     public final Summary WRITES_ISSUED_IN_COMMIT_PROC;
     public final Summary WRITES_ISSUED_IN_COMMIT_PROC;
 
 
+    // Request op throttling related
+    public final Counter THROTTLED_OPS;
+
     /**
     /**
      * Time spent by a read request in the commit processor.
      * Time spent by a read request in the commit processor.
      */
      */
@@ -435,6 +441,7 @@ public final class ServerMetrics {
     public final Counter STALE_REQUESTS;
     public final Counter STALE_REQUESTS;
     public final Counter STALE_REQUESTS_DROPPED;
     public final Counter STALE_REQUESTS_DROPPED;
     public final Counter STALE_REPLIES;
     public final Counter STALE_REPLIES;
+    public final Summary REQUEST_THROTTLE_QUEUE_TIME;
     public final Counter REQUEST_THROTTLE_WAIT_COUNT;
     public final Counter REQUEST_THROTTLE_WAIT_COUNT;
     public final Counter LARGE_REQUESTS_REJECTED;
     public final Counter LARGE_REQUESTS_REJECTED;
 
 

+ 3 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java

@@ -178,7 +178,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
                 ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
                 ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
 
 
                 // track the number of records written to the log
                 // track the number of records written to the log
-                if (zks.getZKDatabase().append(si)) {
+                if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
                     if (shouldSnapshot()) {
                     if (shouldSnapshot()) {
                         resetSnapshotStats();
                         resetSnapshotStats();
                         // roll the log
                         // roll the log
@@ -202,9 +202,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
                     }
                     }
                 } else if (toFlush.isEmpty()) {
                 } else if (toFlush.isEmpty()) {
                     // optimization for read heavy workloads
                     // optimization for read heavy workloads
-                    // iff this is a read, and there are no pending
-                    // flushes (writes), then just pass this to the next
-                    // processor
+                    // iff this is a read or a throttled request(which doesn't need to be written to the disk),
+                    // and there are no pending flushes (writes), then just pass this to the next processor
                     if (nextProcessor != null) {
                     if (nextProcessor != null) {
                         nextProcessor.processRequest(si);
                         nextProcessor.processRequest(si);
                         if (nextProcessor instanceof Flushable) {
                         if (nextProcessor instanceof Flushable) {

+ 12 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -155,6 +155,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
 
     public static final int DEFAULT_TICK_TIME = 3000;
     public static final int DEFAULT_TICK_TIME = 3000;
     protected int tickTime = DEFAULT_TICK_TIME;
     protected int tickTime = DEFAULT_TICK_TIME;
+    public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
+    protected static volatile int throttledOpWaitTime =
+        Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
     /** value of -1 indicates unset, use default */
     /** value of -1 indicates unset, use default */
     protected int minSessionTimeout = -1;
     protected int minSessionTimeout = -1;
     /** value of -1 indicates unset, use default */
     /** value of -1 indicates unset, use default */
@@ -1237,6 +1240,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         this.tickTime = tickTime;
         this.tickTime = tickTime;
     }
     }
 
 
+    public static int getThrottledOpWaitTime() {
+        return throttledOpWaitTime;
+    }
+
+    public static void setThrottledOpWaitTime(int time) {
+        LOG.info("throttledOpWaitTime set to {}", time);
+        throttledOpWaitTime = time;
+    }
+
     public int getMinSessionTimeout() {
     public int getMinSessionTimeout() {
         return minSessionTimeout;
         return minSessionTimeout;
     }
     }

+ 10 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java

@@ -315,6 +315,16 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
     // Request throttling settings
     // Request throttling settings
     ///////////////////////////////////////////////////////////////////////////
     ///////////////////////////////////////////////////////////////////////////
 
 
+    public int getThrottledOpWaitTime() {
+        return zks.getThrottledOpWaitTime();
+    }
+
+    public void setThrottledOpWaitTime(int val) {
+        zks.setThrottledOpWaitTime(val);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
     public int getRequestThrottleLimit() {
     public int getRequestThrottleLimit() {
         return RequestThrottler.getMaxRequests();
         return RequestThrottler.getMaxRequests();
     }
     }

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java

@@ -136,6 +136,9 @@ public interface ZooKeeperServerMXBean {
     boolean getRequestThrottleDropStale();
     boolean getRequestThrottleDropStale();
     void setRequestThrottleDropStale(boolean drop);
     void setRequestThrottleDropStale(boolean drop);
 
 
+    int getThrottledOpWaitTime();
+    void setThrottledOpWaitTime(int val);
+
     boolean getRequestStaleLatencyCheck();
     boolean getRequestStaleLatencyCheck();
     void setRequestStaleLatencyCheck(boolean check);
     void setRequestStaleLatencyCheck(boolean check);
 
 

+ 16 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -29,12 +29,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.WorkerService;
 import org.apache.zookeeper.server.WorkerService;
 import org.apache.zookeeper.server.ZooKeeperCriticalThread;
 import org.apache.zookeeper.server.ZooKeeperCriticalThread;
 import org.apache.zookeeper.server.ZooKeeperServerListener;
 import org.apache.zookeeper.server.ZooKeeperServerListener;
+import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -165,6 +167,9 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP
     }
     }
 
 
     protected boolean needCommit(Request request) {
     protected boolean needCommit(Request request) {
+        if (request.isThrottled()) {
+          return false;
+        }
         switch (request.type) {
         switch (request.type) {
         case OpCode.create:
         case OpCode.create:
         case OpCode.create2:
         case OpCode.create2:
@@ -306,6 +311,11 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP
                         // Process committed head
                         // Process committed head
                         request = committedRequests.peek();
                         request = committedRequests.peek();
 
 
+                        if (request.isThrottled()) {
+                            LOG.error("Throttled request in committed pool: {}. Exiting.", request);
+                            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+                        }
+
                         /*
                         /*
                          * Check if this is a local write request is pending,
                          * Check if this is a local write request is pending,
                          * if so, update it with the committed info. If the commit matches
                          * if so, update it with the committed info. If the commit matches
@@ -349,6 +359,10 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP
                                 topPending.zxid = request.zxid;
                                 topPending.zxid = request.zxid;
                                 topPending.commitRecvTime = request.commitRecvTime;
                                 topPending.commitRecvTime = request.commitRecvTime;
                                 request = topPending;
                                 request = topPending;
+                                if (request.isThrottled()) {
+                                    LOG.error("Throttled request in committed & pending pool: {}. Exiting.", request);
+                                    ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+                                }
                                 // Only decrement if we take a request off the queue.
                                 // Only decrement if we take a request off the queue.
                                 numWriteQueuedRequests.decrementAndGet();
                                 numWriteQueuedRequests.decrementAndGet();
                                 queuedWriteRequests.poll();
                                 queuedWriteRequests.poll();
@@ -452,7 +466,8 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP
      */
      */
     private void sendToNextProcessor(Request request) {
     private void sendToNextProcessor(Request request) {
         numRequestsProcessing.incrementAndGet();
         numRequestsProcessing.incrementAndGet();
-        workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
+        CommitWorkRequest workRequest = new CommitWorkRequest(request);
+        workerPool.schedule(workRequest, request.sessionId);
     }
     }
 
 
     private void processWrite(Request request) throws RequestProcessorException {
     private void processWrite(Request request) throws RequestProcessorException {

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java

@@ -73,6 +73,9 @@ public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements
                 // the request to the leader so that we are ready to receive
                 // the request to the leader so that we are ready to receive
                 // the response
                 // the response
                 nextProcessor.processRequest(request);
                 nextProcessor.processRequest(request);
+                if (request.isThrottled()) {
+                    continue;
+                }
 
 
                 // We now ship the request to the leader. As with all
                 // We now ship the request to the leader. As with all
                 // other quorum operations, sync also follows this code
                 // other quorum operations, sync also follows this code

+ 6 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

@@ -56,6 +56,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.RequestProcessor;
@@ -68,6 +69,7 @@ import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -1218,6 +1220,10 @@ public class Leader extends LearnerMaster {
      * @return the proposal that is queued to send to all the members
      * @return the proposal that is queued to send to all the members
      */
      */
     public Proposal propose(Request request) throws XidRolloverException {
     public Proposal propose(Request request) throws XidRolloverException {
+        if (request.isThrottled()) {
+            LOG.error("Throttled request send as proposal: {}. Exiting.", request);
+            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
         /**
         /**
          * Address the rollover issue. All lower 32bits set indicate a new leader
          * Address the rollover issue. All lower 32bits set indicate a new leader
          * election. Force a re-election instead. See ZOOKEEPER-1277
          * election. Force a re-election instead. See ZOOKEEPER-1277

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -234,6 +234,10 @@ public class Learner {
      * @throws IOException
      * @throws IOException
      */
      */
     void request(Request request) throws IOException {
     void request(Request request) throws IOException {
+        if (request.isThrottled()) {
+            LOG.error("Throttled request sent to leader: {}. Exiting", request);
+            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream oa = new DataOutputStream(baos);
         DataOutputStream oa = new DataOutputStream(baos);
         oa.writeLong(request.sessionId);
         oa.writeLong(request.sessionId);

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java

@@ -82,6 +82,10 @@ public class ObserverRequestProcessor extends ZooKeeperCriticalThread implements
                 // the response
                 // the response
                 nextProcessor.processRequest(request);
                 nextProcessor.processRequest(request);
 
 
+                if (request.isThrottled()) {
+                    continue;
+                }
+
                 // We now ship the request to the leader. As with all
                 // We now ship the request to the leader. As with all
                 // other quorum operations, sync also follows this code
                 // other quorum operations, sync also follows this code
                 // path, but different from others, we need to keep track
                 // path, but different from others, we need to keep track

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java

@@ -60,6 +60,10 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
     }
     }
 
 
     public Request checkUpgradeSession(Request request) throws IOException, KeeperException {
     public Request checkUpgradeSession(Request request) throws IOException, KeeperException {
+        if (request.isThrottled()) {
+            return null;
+        }
+
         // If this is a request for a local session and it is to
         // If this is a request for a local session and it is to
         // create an ephemeral node, then upgrade the session and return
         // create an ephemeral node, then upgrade the session and return
         // a new session request for the leader.
         // a new session request for the leader.

+ 60 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java

@@ -247,6 +247,66 @@ public class QuorumBase extends ClientBase {
         return -1;
         return -1;
     }
     }
 
 
+    public int getLeaderClientPort() {
+      if (s1.getPeerState() == ServerState.LEADING) {
+        return portClient1;
+      } else if (s2.getPeerState() == ServerState.LEADING) {
+        return portClient2;
+      } else if (s3.getPeerState() == ServerState.LEADING) {
+        return portClient3;
+      } else if (s4.getPeerState() == ServerState.LEADING) {
+        return portClient4;
+      } else if (s5.getPeerState() == ServerState.LEADING) {
+        return portClient5;
+      }
+      return -1;
+    }
+
+    public QuorumPeer getLeaderQuorumPeer() {
+      if (s1.getPeerState() == ServerState.LEADING) {
+        return s1;
+      } else if (s2.getPeerState() == ServerState.LEADING) {
+        return s2;
+      } else if (s3.getPeerState() == ServerState.LEADING) {
+        return s3;
+      } else if (s4.getPeerState() == ServerState.LEADING) {
+        return s4;
+      } else if (s5.getPeerState() == ServerState.LEADING) {
+        return s5;
+      }
+      return null;
+    }
+
+    public QuorumPeer getFirstObserver() {
+      if (s1.getLearnerType() == LearnerType.OBSERVER) {
+        return s1;
+      } else if (s2.getLearnerType() == LearnerType.OBSERVER) {
+        return s2;
+      } else if (s3.getLearnerType() == LearnerType.OBSERVER) {
+        return s3;
+      } else if (s4.getLearnerType() == LearnerType.OBSERVER) {
+        return s4;
+      } else if (s5.getLearnerType() == LearnerType.OBSERVER) {
+        return s5;
+      }
+      return null;
+    }
+
+    public int getFirstObserverClientPort() {
+      if (s1.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient1;
+      } else if (s2.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient2;
+      } else if (s3.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient3;
+      } else if (s4.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient4;
+      } else if (s5.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient5;
+      }
+      return -1;
+    }
+
     public String getPeersMatching(ServerState state) {
     public String getPeersMatching(ServerState state) {
         StringBuilder hosts = new StringBuilder();
         StringBuilder hosts = new StringBuilder();
         for (QuorumPeer p : getPeerList()) {
         for (QuorumPeer p : getPeerList()) {

+ 224 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpHelper.java

@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestThrottler;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottledOpHelper {
+    protected static final Logger LOG = LoggerFactory.getLogger(ThrottledOpHelper.class);
+
+    public static final class RequestThrottleMock extends MockUp<RequestThrottler> {
+        public static void throttleEveryNthOp(int n) {
+            everyNthOp = n;
+            opCounter = 0;
+        }
+        private static int everyNthOp = 0;
+        private static int opCounter = 0;
+
+        @Mock
+        private boolean shouldThrottleOp(Request request, long elapsedTime) {
+            if (everyNthOp > 0 && request.isThrottlable() && (++opCounter % everyNthOp == 0)) {
+                opCounter %= everyNthOp;
+                return true;
+            }
+            return false;
+        }
+    }
+
+    public static void applyMockUps() {
+        new RequestThrottleMock();
+    }
+
+    public void testThrottledOp(ZooKeeper zk, ZooKeeperServer zs) throws IOException, InterruptedException, KeeperException {
+        final int N = 5; // must be greater than 3
+        final int COUNT = 100;
+        RequestThrottleMock.throttleEveryNthOp(N);
+        LOG.info("Before create /ivailo nodes");
+        int opCount = 0;
+        for (int i = 0; i < COUNT; i++) {
+            String nodeName = "/ivailo" + i;
+            if (opCount % N == N - 1) {
+                try {
+                    zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                        (i % 2 == 0) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome
+                    Stat stat = zk.exists(nodeName, null);
+                    Assert.assertNull(stat);
+                    zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                        (i % 2 == 0) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL);
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+                opCount += 3; // three ops issued
+            } else {
+                zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    (i % 2 == 0) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL);
+                opCount++; // one op issued
+            }
+            if (opCount % N == N - 1) {
+                try {
+                    zk.setData(nodeName, nodeName.getBytes(), -1);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    zk.setData(nodeName, nodeName.getBytes(), -1);
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+                opCount += 2; // two ops issued, one for retry
+            } else {
+                zk.setData(nodeName, nodeName.getBytes(), -1);
+                opCount++; // one op issued
+            }
+        }
+        LOG.info("Before delete /ivailo nodes");
+        for (int i = 0; i < COUNT; i++) {
+            String nodeName = "/ivailo" + i;
+            if (opCount % N == N - 1) {
+                try {
+                    zk.exists(nodeName, null);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    Stat stat = zk.exists(nodeName, null);
+                    Assert.assertNotNull(stat);
+                    opCount += 2; // two ops issued, one is retry
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+            } else {
+                Stat stat = zk.exists(nodeName, null);
+                Assert.assertNotNull(stat);
+                opCount++;
+            }
+            if (opCount % N == N - 1) {
+                try {
+                    zk.getData(nodeName, null, null);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    byte[] data = zk.getData(nodeName, null, null);
+                    Assert.assertEquals(nodeName, new String(data));
+                    opCount += 2; // two ops issued, one is retry
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+            } else {
+                byte[] data = zk.getData(nodeName, null, null);
+                Assert.assertEquals(nodeName, new String(data));
+                opCount++;
+            }
+            if (opCount % N == N - 1) {
+                try {
+                    // version 0 should not trigger BadVersion exception
+                    zk.delete(nodeName, 0);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    zk.delete(nodeName, -1);
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+                opCount += 2; // two ops issues, one for retry
+            } else {
+                zk.delete(nodeName, -1);
+                opCount++; // one op only issued
+            }
+            if (opCount % N == N - 1) {
+                try {
+                    zk.exists(nodeName, null);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    Stat stat = zk.exists(nodeName, null);
+                    Assert.assertNull(stat);
+                    opCount += 2; // two ops issued, one is retry
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+            } else {
+                Stat stat = zk.exists(nodeName, null);
+                Assert.assertNull(stat);
+                opCount++;
+            }
+        }
+        LOG.info("After delete /ivailo");
+        zk.close();
+    }
+
+    public void testThrottledAcl(ZooKeeper zk, ZooKeeperServer zs) throws Exception {
+        RequestThrottleMock.throttleEveryNthOp(0);
+
+        final ArrayList<ACL> ACL_PERMS =
+          new ArrayList<ACL>() { {
+            add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+            add(new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+            add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
+        }};
+        String path = "/path1";
+        zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.addAuthInfo("digest", "pat:test".getBytes());
+        List<ACL> defaultAcls = zk.getACL(path, null);
+        Assert.assertEquals(1, defaultAcls.size());
+
+        RequestThrottleMock.throttleEveryNthOp(2);
+
+        path = "/path2";
+        zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT);
+        try {
+            zk.setACL(path, ACL_PERMS, -1);
+            Assert.fail("Should have gotten ThrottledOp exception");
+        } catch (KeeperException.ThrottledOpException e) {
+            // expected
+        } catch (KeeperException e) {
+            Assert.fail("Should have gotten ThrottledOp exception");
+        }
+        List<ACL> acls = zk.getACL(path, null);
+        Assert.assertEquals(1, acls.size());
+
+        RequestThrottleMock.throttleEveryNthOp(0);
+
+        path = "/path3";
+        zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT);
+        zk.setACL(path, ACL_PERMS, -1);
+        acls = zk.getACL(path, null);
+        Assert.assertEquals(3, acls.size());
+    }
+}

+ 73 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java

@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpObserverTest extends QuorumBase {
+    @BeforeClass
+    public static void applyMockUps() {
+        ThrottledOpHelper.applyMockUps();
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp(true /* withObservers */);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Test
+    public void testThrottledOpObserver() throws IOException, InterruptedException, KeeperException {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient("localhost:" + getFirstObserverClientPort());
+            ZooKeeperServer zs = getFirstObserver().getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledOp(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledAclObserver() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient("localhost:" + getFirstObserverClientPort());
+            ZooKeeperServer zs = getFirstObserver().getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledAcl(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+}

+ 98 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpQuorumTest.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpQuorumTest extends QuorumBase {
+    @BeforeClass
+    public static void applyMockUps() {
+        ThrottledOpHelper.applyMockUps();
+    }
+
+    @Test
+    public void testThrottledOpLeader() throws IOException, InterruptedException, KeeperException {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient("localhost:" + getLeaderClientPort());
+            ZooKeeperServer zs = getLeaderQuorumPeer().getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledOp(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledAclLeader() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient("localhost:" + getLeaderClientPort());
+            ZooKeeperServer zs = getLeaderQuorumPeer().getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledAcl(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledOpFollower() throws IOException, InterruptedException, KeeperException {
+        ZooKeeper zk = null;
+        try {
+            int clientPort = (getLeaderClientPort() == portClient1) ? portClient2 : portClient1;
+            zk = createClient("localhost:" + clientPort);
+            QuorumPeer qp = (getLeaderClientPort() == portClient1) ? s2 : s1;
+            ZooKeeperServer zs = qp.getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledOp(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledAclFollower() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            int clientPort = (getLeaderClientPort() == portClient1) ? portClient2 : portClient1;
+            zk = createClient("localhost:" + clientPort);
+            QuorumPeer qp = (getLeaderClientPort() == portClient1) ? s2 : s1;
+            ZooKeeperServer zs = qp.getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledAcl(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+}

+ 64 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpStandaloneTest.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpStandaloneTest extends ClientBase {
+
+    @BeforeClass
+    public static void applyMockUps() {
+        ThrottledOpHelper.applyMockUps();
+    }
+
+    @Test
+    public void testThrottledOp() throws IOException, InterruptedException, KeeperException {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient(hostPort);
+            ZooKeeperServer zs = serverFactory.getZooKeeperServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledOp(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledAcl() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient(hostPort);
+            ZooKeeperServer zs = serverFactory.getZooKeeperServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledAcl(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+}