Переглянути джерело

ZOOKEEPER-3492: Add weights to server side connection throttling

Author: Jie Huang <jiehuang@fb.com>

Reviewers: Michael Han <hanm@apache.org>, Enrico Olivelli <eolivelli@gmail.com>

Closes #1037 from jhuan31/ZOOKEEPER-3492
Jie Huang 5 роки тому
батько
коміт
eecd9e7ce0

+ 27 - 6
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -124,8 +124,8 @@ is no full support.
 
 
 #### Required Software
 #### Required Software
 
 
-ZooKeeper runs in Java, release 1.8 or greater 
-(JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported). 
+ZooKeeper runs in Java, release 1.8 or greater
+(JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported).
 It runs as an _ensemble_ of ZooKeeper servers. Three
 It runs as an _ensemble_ of ZooKeeper servers. Three
 ZooKeeper servers is the minimum recommended size for an
 ZooKeeper servers is the minimum recommended size for an
 ensemble, and we also recommend that they run on separate
 ensemble, and we also recommend that they run on separate
@@ -822,6 +822,27 @@ property, when available, is noted below.
     dropping. This parameter defines the threshold to decrease the dropping
     dropping. This parameter defines the threshold to decrease the dropping
     probability. The default is 0.
     probability. The default is 0.
 
 
+* *zookeeper.connection_throttle_weight_enabled* :
+    (Java system property only)
+    **New in 3.6.0:**
+    Whether to consider connection weights when throttling. Only useful when connection throttle is enabled, that is, connectionMaxTokens is larger than 0. The default is false.
+
+* *zookeeper.connection_throttle_global_session_weight* :
+    (Java system property only)
+    **New in 3.6.0:**
+    The weight of a global session. It is the number of tokens required for a global session request to get through the connection throttler. It has to be a positive integer no smaller than the weight of a local session. The default is 3.
+
+* *zookeeper.connection_throttle_local_session_weight* :
+    (Java system property only)
+    **New in 3.6.0:**
+    The weight of a local session. It is the number of tokens required for a local session request to get through the connection throttler. It has to be a positive integer no larger than the weight of a global session or a renew session. The default is 1.
+
+* *zookeeper.connection_throttle_renew_session_weight* :
+    (Java system property only)
+    **New in 3.6.0:**
+    The weight of renewing a session. It is also the number of tokens required for a reconnect request to get through the throttler. It has to be a positive integer no smaller than the weight of a local session. The default is 2.
+
+
  * *clientPortListenBacklog* :
  * *clientPortListenBacklog* :
     **New in 3.4.14, 3.5.5, 3.6.0:**
     **New in 3.4.14, 3.5.5, 3.6.0:**
     The socket backlog length for the ZooKeeper server socket. This controls
     The socket backlog length for the ZooKeeper server socket. This controls
@@ -889,7 +910,7 @@ property, when available, is noted below.
 
 
 * *advancedFlowControlEnabled* :
 * *advancedFlowControlEnabled* :
     (Java system property: **zookeeper.netty.advancedFlowControl.enabled**)
     (Java system property: **zookeeper.netty.advancedFlowControl.enabled**)
-    Using accurate flow control in netty based on the status of ZooKeeper 
+    Using accurate flow control in netty based on the status of ZooKeeper
     pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in
     pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in
     Netty.
     Netty.
 
 
@@ -958,9 +979,9 @@ of servers -- that is, when deploying clusters of servers.
 * *connectToLearnerMasterLimit* :
 * *connectToLearnerMasterLimit* :
     (Java system property: zookeeper.**connectToLearnerMasterLimit**)
     (Java system property: zookeeper.**connectToLearnerMasterLimit**)
     Amount of time, in ticks (see [tickTime](#id_tickTime)), to allow followers to
     Amount of time, in ticks (see [tickTime](#id_tickTime)), to allow followers to
-    connect to the leader after leader election. Defaults to the value of initLimit. 
+    connect to the leader after leader election. Defaults to the value of initLimit.
     Use when initLimit is high so connecting to learner master doesn't result in higher timeout.
     Use when initLimit is high so connecting to learner master doesn't result in higher timeout.
-        
+
 * *leaderServes* :
 * *leaderServes* :
     (Java system property: zookeeper.**leaderServes**)
     (Java system property: zookeeper.**leaderServes**)
     Leader accepts client connections. Default value is "yes".
     Leader accepts client connections. Default value is "yes".
@@ -1568,7 +1589,7 @@ options are used to configure the [AdminServer](#sc_adminserver).
 
 
 ### Metrics Providers
 ### Metrics Providers
 
 
-**New in 3.6.0:** The following options are used to configure metrics. 
+**New in 3.6.0:** The following options are used to configure metrics.
 
 
  By default ZooKeeper server exposes useful metrics using the [AdminServer](#sc_adminserver).
  By default ZooKeeper server exposes useful metrics using the [AdminServer](#sc_adminserver).
  and [Four Letter Words](#sc_4lw) interface.
  and [Four Letter Words](#sc_4lw) interface.

+ 109 - 10
zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java

@@ -20,6 +20,8 @@ package org.apache.zookeeper.server;
 
 
 import java.util.Random;
 import java.util.Random;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
 /**
  * Implements a token-bucket based rate limiting mechanism with optional
  * Implements a token-bucket based rate limiting mechanism with optional
@@ -69,6 +71,7 @@ import org.apache.zookeeper.common.Time;
  **/
  **/
 
 
 public class BlueThrottle {
 public class BlueThrottle {
+    private static final Logger LOG = LoggerFactory.getLogger(BlueThrottle.class);
 
 
     private int maxTokens;
     private int maxTokens;
     private int fillTime;
     private int fillTime;
@@ -86,35 +89,115 @@ public class BlueThrottle {
     Random rng;
     Random rng;
 
 
     public static final String CONNECTION_THROTTLE_TOKENS = "zookeeper.connection_throttle_tokens";
     public static final String CONNECTION_THROTTLE_TOKENS = "zookeeper.connection_throttle_tokens";
-    public static final int DEFAULT_CONNECTION_THROTTLE_TOKENS;
+    private static final int DEFAULT_CONNECTION_THROTTLE_TOKENS;
 
 
     public static final String CONNECTION_THROTTLE_FILL_TIME = "zookeeper.connection_throttle_fill_time";
     public static final String CONNECTION_THROTTLE_FILL_TIME = "zookeeper.connection_throttle_fill_time";
-    public static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
+    private static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
 
 
     public static final String CONNECTION_THROTTLE_FILL_COUNT = "zookeeper.connection_throttle_fill_count";
     public static final String CONNECTION_THROTTLE_FILL_COUNT = "zookeeper.connection_throttle_fill_count";
-    public static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
+    private static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
 
 
     public static final String CONNECTION_THROTTLE_FREEZE_TIME = "zookeeper.connection_throttle_freeze_time";
     public static final String CONNECTION_THROTTLE_FREEZE_TIME = "zookeeper.connection_throttle_freeze_time";
-    public static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
+    private static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
 
 
     public static final String CONNECTION_THROTTLE_DROP_INCREASE = "zookeeper.connection_throttle_drop_increase";
     public static final String CONNECTION_THROTTLE_DROP_INCREASE = "zookeeper.connection_throttle_drop_increase";
-    public static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
+    private static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
 
 
     public static final String CONNECTION_THROTTLE_DROP_DECREASE = "zookeeper.connection_throttle_drop_decrease";
     public static final String CONNECTION_THROTTLE_DROP_DECREASE = "zookeeper.connection_throttle_drop_decrease";
-    public static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
+    private static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
 
 
     public static final String CONNECTION_THROTTLE_DECREASE_RATIO = "zookeeper.connection_throttle_decrease_ratio";
     public static final String CONNECTION_THROTTLE_DECREASE_RATIO = "zookeeper.connection_throttle_decrease_ratio";
-    public static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+    private static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+
+    public static final String WEIGHED_CONNECTION_THROTTLE = "zookeeper.connection_throttle_weight_enabled";
+    private static boolean connectionWeightEnabled;
+
+    public static final String GLOBAL_SESSION_WEIGHT = "zookeeper.connection_throttle_global_session_weight";
+    private static final int DEFAULT_GLOBAL_SESSION_WEIGHT;
+
+    public static final String LOCAL_SESSION_WEIGHT = "zookeeper.connection_throttle_local_session_weight";
+    private static final int DEFAULT_LOCAL_SESSION_WEIGHT;
+
+    public static final String RENEW_SESSION_WEIGHT = "zookeeper.connection_throttle_renew_session_weight";
+    private static final int DEFAULT_RENEW_SESSION_WEIGHT;
+
+    // for unit tests only
+    protected  static void setConnectionWeightEnabled(boolean enabled) {
+        connectionWeightEnabled = enabled;
+        logWeighedThrottlingSetting();
+    }
+
+    private static void logWeighedThrottlingSetting() {
+        if (connectionWeightEnabled) {
+            LOG.info("Weighed connection throttling is enabled. "
+                    + "But it will only be effective if connection throttling is enabled");
+            LOG.info(
+                    "The weights for different session types are: global {} renew {} local {}",
+                    DEFAULT_GLOBAL_SESSION_WEIGHT,
+                    DEFAULT_RENEW_SESSION_WEIGHT,
+                    DEFAULT_LOCAL_SESSION_WEIGHT
+            );
+        } else {
+            LOG.info("Weighed connection throttling is disabled");
+        }
+    }
 
 
     static {
     static {
-        DEFAULT_CONNECTION_THROTTLE_TOKENS = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0);
-        DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1);
-        DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1);
+        int tokens = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0);
+        int fillCount = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1);
+
+        connectionWeightEnabled = Boolean.getBoolean(WEIGHED_CONNECTION_THROTTLE);
+
+        // if not specified, the weights for a global session, a local session, and a renew session
+        // are 3, 1, 2 respectively. The weight for a global session is 3 because in our connection benchmarking,
+        // the throughput of global sessions is about one third of that of local sessions. Renewing a session
+        // requires is more expensive than establishing a local session and cheaper than creating a global session so
+        // its default weight is set to 2.
+        int globalWeight = Integer.getInteger(GLOBAL_SESSION_WEIGHT, 3);
+        int localWeight = Integer.getInteger(LOCAL_SESSION_WEIGHT, 1);
+        int renewWeight = Integer.getInteger(RENEW_SESSION_WEIGHT, 2);
+
+        if (globalWeight <= 0) {
+            LOG.warn("Invalid global session weight {}. It should be larger than 0", globalWeight);
+            DEFAULT_GLOBAL_SESSION_WEIGHT = 3;
+        } else if (globalWeight < localWeight) {
+            LOG.warn("The global session weight {} is less than the local session weight {}. Use the local session weight.",
+                    globalWeight, localWeight);
+            DEFAULT_GLOBAL_SESSION_WEIGHT = localWeight;
+        } else {
+            DEFAULT_GLOBAL_SESSION_WEIGHT = globalWeight;
+        }
 
 
+        if (localWeight <= 0) {
+            LOG.warn("Invalid local session weight {}. It should be larger than 0", localWeight);
+            DEFAULT_LOCAL_SESSION_WEIGHT = 1;
+        } else {
+            DEFAULT_LOCAL_SESSION_WEIGHT = localWeight;
+        }
+
+        if (renewWeight <= 0) {
+            LOG.warn("Invalid renew session weight {}. It should be larger than 0", renewWeight);
+            DEFAULT_RENEW_SESSION_WEIGHT = 2;
+        } else if (renewWeight < localWeight) {
+            LOG.warn("The renew session weight {} is less than the local session weight {}. Use the local session weight.",
+                    renewWeight, localWeight);
+            DEFAULT_RENEW_SESSION_WEIGHT = localWeight;
+        } else {
+            DEFAULT_RENEW_SESSION_WEIGHT = renewWeight;
+        }
+
+        // This is based on the assumption that tokens set in config are for global sessions
+        DEFAULT_CONNECTION_THROTTLE_TOKENS = connectionWeightEnabled
+                ? DEFAULT_GLOBAL_SESSION_WEIGHT * tokens : tokens;
+        DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1);
+        DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = connectionWeightEnabled
+                ? DEFAULT_GLOBAL_SESSION_WEIGHT * fillCount : fillCount;
         DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME = Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME, -1);
         DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME = Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME, -1);
         DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE, 0.02);
         DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE, 0.02);
         DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE, 0.002);
         DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE, 0.002);
         DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO = getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO, 0);
         DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO = getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO, 0);
+
+        logWeighedThrottlingSetting();
     }
     }
 
 
     /* Varation of Integer.getInteger for real number properties */
     /* Varation of Integer.getInteger for real number properties */
@@ -212,6 +295,22 @@ public class BlueThrottle {
         return maxTokens - tokens;
         return maxTokens - tokens;
     }
     }
 
 
+    public int getRequiredTokensForGlobal() {
+        return BlueThrottle.DEFAULT_GLOBAL_SESSION_WEIGHT;
+    }
+
+    public int getRequiredTokensForLocal() {
+        return BlueThrottle.DEFAULT_LOCAL_SESSION_WEIGHT;
+    }
+
+    public int getRequiredTokensForRenew() {
+        return BlueThrottle.DEFAULT_RENEW_SESSION_WEIGHT;
+    }
+
+    public boolean isConnectionWeightEnabled() {
+        return BlueThrottle.connectionWeightEnabled;
+    }
+
     public synchronized boolean checkLimit(int need) {
     public synchronized boolean checkLimit(int need) {
         // A maxTokens setting of zero disables throttling
         // A maxTokens setting of zero disables throttling
         if (maxTokens == 0) {
         if (maxTokens == 0) {

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java

@@ -137,4 +137,5 @@ public interface SessionTracker {
      */
      */
     long getLocalSessionCount();
     long getLocalSessionCount();
 
 
+    boolean isLocalSessionsEnabled();
 }
 }

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java

@@ -342,4 +342,8 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements Sessi
         return 0;
         return 0;
     }
     }
 
 
+    @Override
+    public boolean isLocalSessionsEnabled() {
+        return false;
+    }
 }
 }

+ 27 - 7
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -148,6 +148,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     protected String initialConfig;
     protected String initialConfig;
     private final RequestPathMetricsCollector requestPathMetricsCollector;
     private final RequestPathMetricsCollector requestPathMetricsCollector;
 
 
+    private boolean localSessionEnabled = false;
     protected enum State {
     protected enum State {
         INITIAL,
         INITIAL,
         RUNNING,
         RUNNING,
@@ -598,7 +599,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         registerMetrics();
         registerMetrics();
 
 
         setState(State.RUNNING);
         setState(State.RUNNING);
+
         requestPathMetricsCollector.start();
         requestPathMetricsCollector.start();
+
+        localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
         notifyAll();
         notifyAll();
     }
     }
 
 
@@ -1212,12 +1216,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return connThrottle.getDropChance();
         return connThrottle.getDropChance();
     }
     }
 
 
-    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException, ClientCnxnLimitException {
-
-        if (!connThrottle.checkLimit(1)) {
-            throw new ClientCnxnLimitException();
-        }
-        ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+    @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
+    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
+        throws IOException, ClientCnxnLimitException {
 
 
         BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
         BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
         ConnectRequest connReq = new ConnectRequest();
         ConnectRequest connReq = new ConnectRequest();
@@ -1226,7 +1227,27 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress()
             LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress()
                       + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen()));
                       + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen()));
         }
         }
+        long sessionId = connReq.getSessionId();
+        int tokensNeeded = 1;
+        if (connThrottle.isConnectionWeightEnabled()) {
+            if (sessionId == 0) {
+                if (localSessionEnabled) {
+                    tokensNeeded = connThrottle.getRequiredTokensForLocal();
+                } else {
+                    tokensNeeded = connThrottle.getRequiredTokensForGlobal();
+                }
+            } else {
+                tokensNeeded = connThrottle.getRequiredTokensForRenew();
+            }
+        }
+
+        if (!connThrottle.checkLimit(tokensNeeded)) {
+            throw new ClientCnxnLimitException();
+        }
+        ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+
         ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
         ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
+
         boolean readOnly = false;
         boolean readOnly = false;
         try {
         try {
             readOnly = bia.readBool("readOnly");
             readOnly = bia.readBool("readOnly");
@@ -1269,7 +1290,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         // We don't want to receive any packets until we are sure that the
         // We don't want to receive any packets until we are sure that the
         // session is setup
         // session is setup
         cnxn.disableRecv();
         cnxn.disableRecv();
-        long sessionId = connReq.getSessionId();
         if (sessionId == 0) {
         if (sessionId == 0) {
             long id = createSession(cnxn, passwd, sessionTimeout);
             long id = createSession(cnxn, passwd, sessionTimeout);
             if (LOG.isDebugEnabled()) {
             if (LOG.isDebugEnabled()) {

+ 0 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java

@@ -38,7 +38,6 @@ public class LeaderSessionTracker extends UpgradeableSessionTracker {
 
 
     private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class);
     private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class);
 
 
-    private final boolean localSessionsEnabled;
     private final SessionTrackerImpl globalSessionTracker;
     private final SessionTrackerImpl globalSessionTracker;
 
 
     /**
     /**

+ 5 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java

@@ -55,6 +55,11 @@ public abstract class UpgradeableSessionTracker implements SessionTracker {
         return localSessionTracker != null && localSessionTracker.isTrackingSession(sessionId);
         return localSessionTracker != null && localSessionTracker.isTrackingSession(sessionId);
     }
     }
 
 
+    @Override
+    public boolean isLocalSessionsEnabled() {
+        return localSessionsEnabled;
+    }
+
     public boolean isUpgradingSession(long sessionId) {
     public boolean isUpgradingSession(long sessionId) {
         return upgradingSessions != null && upgradingSessions.containsKey(sessionId);
         return upgradingSessions != null && upgradingSessions.containsKey(sessionId);
     }
     }

+ 134 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java

@@ -22,7 +22,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 import java.util.Random;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -30,6 +35,7 @@ import org.slf4j.LoggerFactory;
 public class BlueThrottleTest extends ZKTestCase {
 public class BlueThrottleTest extends ZKTestCase {
 
 
     private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class);
     private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class);
+    private static final int RAPID_TIMEOUT = 10000;
 
 
     class MockRandom extends Random {
     class MockRandom extends Random {
 
 
@@ -162,4 +168,132 @@ public class BlueThrottleTest extends ZKTestCase {
         assertTrue("Later requests should have a chance", accepted > 0);
         assertTrue("Later requests should have a chance", accepted > 0);
     }
     }
 
 
+    private QuorumUtil quorumUtil = new QuorumUtil(1);
+    private ClientBase.CountdownWatcher[] watchers;
+    private ZooKeeper[] zks;
+
+    private int connect(int n) throws Exception {
+        String connStr = quorumUtil.getConnectionStringForServer(1);
+        int connected = 0;
+
+        zks = new ZooKeeper[n];
+        watchers = new ClientBase.CountdownWatcher[n];
+        for (int i = 0; i < n; i++){
+            watchers[i] = new ClientBase.CountdownWatcher();
+            zks[i] = new ZooKeeper(connStr, 3000, watchers[i]);
+            try {
+                watchers[i].waitForConnected(RAPID_TIMEOUT);
+                connected++;
+            } catch (TimeoutException e) {
+                LOG.info("Connection denied by the throttler due to insufficient tokens");
+                break;
+            }
+        }
+
+        return connected;
+    }
+
+    private void shutdownQuorum() throws Exception{
+        for (ZooKeeper zk : zks) {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+
+        quorumUtil.shutdownAll();
+    }
+
+    @Test
+    public void testNoThrottling() throws Exception {
+        quorumUtil.startAll();
+
+        //disable throttling
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(0);
+
+        int connected = connect(10);
+
+        Assert.assertEquals(10, connected);
+        shutdownQuorum();
+    }
+
+    @Test
+    public void testThrottling() throws Exception {
+        quorumUtil.enableLocalSession(true);
+        quorumUtil.startAll();
+
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
+        //no refill, makes testing easier
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+
+        int connected = connect(3);
+        Assert.assertEquals(2, connected);
+        shutdownQuorum();
+
+        quorumUtil.enableLocalSession(false);
+        quorumUtil.startAll();
+
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
+        //no refill, makes testing easier
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+
+        connected = connect(3);
+        Assert.assertEquals(2, connected);
+        shutdownQuorum();
+    }
+
+    @Test
+    public void testWeighedThrottling() throws Exception {
+        // this test depends on the session weights set to the default values
+        // 3 for global session, 2 for renew sessions, 1 for local sessions
+        BlueThrottle.setConnectionWeightEnabled(true);
+
+        quorumUtil.enableLocalSession(true);
+        quorumUtil.startAll();
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+        //try to create 11 local sessions, 10 created, because we have only 10 tokens
+        int connected = connect(11);
+        Assert.assertEquals(10, connected);
+        shutdownQuorum();
+
+        quorumUtil.enableLocalSession(false);
+        quorumUtil.startAll();
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+        //tyr to create 11 global sessions, 3 created, because we have 10 tokens and each connection needs 3
+        connected = connect(11);
+        Assert.assertEquals(3, connected);
+        shutdownQuorum();
+
+        quorumUtil.startAll();
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+        connected = connect(2);
+        Assert.assertEquals(2, connected);
+
+        quorumUtil.shutdown(1);
+        watchers[0].waitForDisconnected(RAPID_TIMEOUT);
+        watchers[1].waitForDisconnected(RAPID_TIMEOUT);
+
+        quorumUtil.restart(1);
+        //client will try to reconnect
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(3);
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+        int reconnected = 0;
+        for (int i = 0; i < 2; i++){
+            try {
+                watchers[i].waitForConnected(RAPID_TIMEOUT);
+                reconnected++;
+            } catch (TimeoutException e) {
+                LOG.info("One reconnect fails due to insufficient tokens");
+            }
+        }
+        //each reconnect takes two tokens, we have 3, so only one reconnects
+        LOG.info("reconnected {}", reconnected);
+        Assert.assertEquals(1, reconnected);
+        shutdownQuorum();
+    }
 }
 }

+ 4 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java

@@ -278,6 +278,10 @@ public class PrepRequestProcessorTest extends ClientBase {
             return 0;
             return 0;
         }
         }
 
 
+        @Override
+        public boolean isLocalSessionsEnabled() {
+            return false;
+        }
     }
     }
 
 
 }
 }