Kaynağa Gözat

ZOOKEEPER-3242: Add server side connecting throttling

Author: Jie Huang <jiehuang@fb.com>

Reviewers: fangmin@apache.org, andor@apache.org

Closes #769 from jhuan31/ZOOKEEPER-3242 and squashes the following commits:

c3ec81f4e [Jie Huang] refactoring
86cad39c4 [Jie Huang] Use a mock random number generator to make the unit test flaky-proof
a278504d4 [Jie Huang] Add unit tests for server-side connection throttling
fd966502b [Jie Huang] update doc for server-side connection throttling
2f1ed0b87 [Jie Huang] Fix FindBugs Warnings
a48b0fcb1 [Jie Huang] ZOOKEEPER-3242: Add server side connecting throttling
Jie Huang 6 yıl önce
ebeveyn
işleme
6688285033

+ 68 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -738,6 +738,74 @@ property, when available, is noted below.
     strategy from the configured minimum (fastleader.minNotificationInterval)
     and the configured maximum (this) for long elections.
 
+* *connectionMaxTokens* :
+    (Java system property: **zookeeper.connection_throttle_tokens**)
+    **New in 3.6.0:**
+    This is one of the parameters to tune the server-side connection throttler,
+    which is a token-based rate limiting mechanism with optional probabilistic
+    dropping.
+    This parameter defines the maximum number of tokens in the token-bucket.
+    When set to 0, throttling is disabled. Default is 0.
+
+* *connectionTokenFillTime* :
+    (Java system property: **zookeeper.connection_throttle_fill_time**)
+    **New in 3.6.0:**
+    This is one of the parameters to tune the server-side connection throttler,
+    which is a token-based rate limiting mechanism with optional probabilistic
+    dropping.
+    This parameter defines the interval in milliseconds when the token bucket is re-filled with
+    *connectionTokenFillCount* tokens. Default is 1.
+
+* *connectionTokenFillCount* :
+    (Java system property: **zookeeper.connection_throttle_fill_count**)
+    **New in 3.6.0:**
+    This is one of the parameters to tune the server-side connection throttler,
+    which is a token-based rate limiting mechanism with optional probabilistic
+    dropping.
+    This parameter defines the number of tokens to add to the token bucket every
+    *connectionTokenFillTime* milliseconds. Default is 1.
+
+* *connectionFreezeTime* :
+    (Java system property: **zookeeper.connection_throttle_freeze_time**)
+    **New in 3.6.0:**
+    This is one of the parameters to tune the server-side connection throttler,
+    which is a token-based rate limiting mechanism with optional probabilistic
+    dropping.
+    This parameter defines the interval in milliseconds when the dropping
+    probability is adjusted. When set to -1, probabilistic dropping is disabled.
+    Default is -1.     
+
+* *connectionDropIncrease* :
+    (Java system property: **zookeeper.connection_throttle_drop_increase**)
+    **New in 3.6.0:**
+    This is one of the parameters to tune the server-side connection throttler,
+    which is a token-based rate limiting mechanism with optional probabilistic
+    dropping.
+    This parameter defines the dropping probability to increase. The throttler
+    checks every *connectionFreezeTime* milliseconds and if the token bucket is
+    empty, the dropping probability will be increased by *connectionDropIncrease*.
+    The default is 0.02.
+
+* *connectionDropDecrease* :
+    (Java system property: **zookeeper.connection_throttle_drop_decrease**)
+    **New in 3.6.0:**
+    This is one of the parameters to tune the server-side connection throttler,
+    which is a token-based rate limiting mechanism with optional probabilistic
+    dropping.
+    This parameter defines the dropping probability to decrease. The throttler
+    checks every *connectionFreezeTime* milliseconds and if the token bucket has
+    more tokens than a threshold, the dropping probability will be decreased by
+    *connectionDropDecrease*. The threshold is *connectionMaxTokens* \*
+    *connectionDecreaseRatio*. The default is 0.002.
+
+* *connectionDecreaseRatio* :
+    (Java system property: **zookeeper.connection_throttle_decrease_ratio**)
+    **New in 3.6.0:**
+    This is one of the parameters to tune the server-side connection throttler,
+    which is a token-based rate limiting mechanism with optional probabilistic
+    dropping. This parameter defines the threshold to decrease the dropping
+    probability. The default is 0.
+
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options

+ 268 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java

@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Random;
+
+import org.apache.zookeeper.common.Time;
+
+/**
+ * Implements a token-bucket based rate limiting mechanism with optional
+ * probabilistic dropping inspired by the BLUE queue management algorithm [1].
+ *
+ * The throttle provides the {@link #checkLimit(int)} method which provides
+ * a binary yes/no decision.
+ *
+ * The core token bucket algorithm starts with an initial set of tokens based
+ * on the <code>maxTokens</code> setting. Tokens are dispensed each
+ * {@link #checkLimit(int)} call, which fails if there are not enough tokens to
+ * satisfy a given request.
+ *
+ * The token bucket refills over time, providing <code>fillCount</code> tokens
+ * every <code>fillTime</code> milliseconds, capping at <code>maxTokens</code>.
+ *
+ * This design allows the throttle to allow short bursts to pass, while still
+ * capping the total number of requests per time interval.
+ *
+ * One issue with a pure token bucket approach for something like request or
+ * connection throttling is that the wall clock arrival time of requests affects
+ * the probability of a request being allowed to pass or not. Under constant
+ * load this can lead to request starvation for requests that constantly arrive
+ * later than the majority.
+ *
+ * In an attempt to combat this, this throttle can also provide probabilistic
+ * dropping. This is enabled anytime <code>freezeTime</code> is set to a value
+ * other than <code>-1</code>.
+ *
+ * The probabilistic algorithm starts with an initial drop probability of 0, and
+ * adjusts this probability roughly every <code>freezeTime</code> milliseconds.
+ * The first request after <code>freezeTime</code>, the algorithm checks the
+ * token bucket. If the token bucket is empty, the drop probability is increased
+ * by <code>dropIncrease</code> up to a maximum of <code>1</code>. Otherwise, if
+ * the bucket has a token deficit less than <code>decreasePoint * maxTokens</code>,
+ * the probability is decreased by <code>dropDecrease</code>.
+ *
+ * Given a call to {@link #checkLimit(int)}, requests are first dropped randomly
+ * based on the current drop probability, and only surviving requests are then
+ * checked against the token bucket.
+ *
+ * When under constant load, the probabilistic algorithm will adapt to a drop
+ * frequency that should keep requests within the token limit. When load drops,
+ * the drop probability will decrease, eventually returning to zero if possible.
+ *
+ * [1] "BLUE: A New Class of Active Queue Management Algorithms"
+ **/
+
+public class BlueThrottle {
+    private int maxTokens;
+    private int fillTime;
+    private int fillCount;
+    private int tokens;
+    private long lastTime;
+
+    private int freezeTime;
+    private long lastFreeze;
+    private double dropIncrease;
+    private double dropDecrease;
+    private double decreasePoint;
+    private double drop;
+
+    Random rng;
+
+    public static final String CONNECTION_THROTTLE_TOKENS = "zookeeper.connection_throttle_tokens";
+    public static final int DEFAULT_CONNECTION_THROTTLE_TOKENS;
+
+    public static final String CONNECTION_THROTTLE_FILL_TIME = "zookeeper.connection_throttle_fill_time";
+    public static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
+
+    public static final String CONNECTION_THROTTLE_FILL_COUNT = "zookeeper.connection_throttle_fill_count";
+    public static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
+
+    public static final String CONNECTION_THROTTLE_FREEZE_TIME = "zookeeper.connection_throttle_freeze_time";
+    public static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
+
+    public static final String CONNECTION_THROTTLE_DROP_INCREASE = "zookeeper.connection_throttle_drop_increase";
+    public static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
+
+    public static final String CONNECTION_THROTTLE_DROP_DECREASE = "zookeeper.connection_throttle_drop_decrease";
+    public static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
+
+    public static final String CONNECTION_THROTTLE_DECREASE_RATIO = "zookeeper.connection_throttle_decrease_ratio";
+    public static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+
+
+    static {
+        DEFAULT_CONNECTION_THROTTLE_TOKENS = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0);
+        DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1);
+        DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1);
+
+        DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME = Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME, -1);
+        DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE, 0.02);
+        DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE, 0.002);
+        DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO = getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO, 0);
+    }
+
+    /* Varation of Integer.getInteger for real number properties */
+    private static double getDoubleProp(String name, double def) {
+        String val = System.getProperty(name);
+        if(val != null) {
+            return Double.parseDouble(val);
+        }
+        else {
+            return def;
+        }
+    }
+
+
+    public BlueThrottle() {
+        // Disable throttling by default (maxTokens = 0)
+        this.maxTokens = DEFAULT_CONNECTION_THROTTLE_TOKENS;
+        this.fillTime  = DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
+        this.fillCount = DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
+        this.tokens = maxTokens;
+        this.lastTime = Time.currentElapsedTime();
+
+        // Disable BLUE throttling by default (freezeTime = -1)
+        this.freezeTime = DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
+        this.lastFreeze = Time.currentElapsedTime();
+        this.dropIncrease = DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
+        this.dropDecrease = DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
+        this.decreasePoint = DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+        this.drop = 0;
+
+        this.rng = new Random();
+    }
+
+    public synchronized void setMaxTokens(int max) {
+        int deficit = maxTokens - tokens;
+        maxTokens = max;
+        tokens = max - deficit;
+    }
+
+    public synchronized void setFillTime(int time) {
+        fillTime = time;
+    }
+
+    public synchronized void setFillCount(int count) {
+        fillCount = count;
+    }
+
+    public synchronized void setFreezeTime(int time) {
+        freezeTime = time;
+    }
+
+    public synchronized void setDropIncrease(double increase) {
+        dropIncrease = increase;
+    }
+
+    public synchronized void setDropDecrease(double decrease) {
+        dropDecrease = decrease;
+    }
+
+    public synchronized void setDecreasePoint(double ratio) {
+        decreasePoint = ratio;
+    }
+
+    public synchronized int getMaxTokens() {
+        return maxTokens;
+    }
+
+    public synchronized int getFillTime() {
+        return fillTime;
+    }
+
+    public synchronized int getFillCount() {
+        return fillCount;
+    }
+
+    public synchronized int getFreezeTime() {
+        return freezeTime;
+    }
+
+    public synchronized double getDropIncrease() {
+        return dropIncrease;
+    }
+
+    public synchronized double getDropDecrease() {
+        return dropDecrease;
+    }
+
+    public synchronized double getDecreasePoint() {
+        return decreasePoint;
+    }
+
+    public synchronized double getDropChance() {
+        return drop;
+    }
+
+    public synchronized int getDeficit() {
+        return maxTokens - tokens;
+    }
+
+    public synchronized boolean checkLimit(int need) {
+        // A maxTokens setting of zero disables throttling
+        if (maxTokens == 0)
+            return true;
+
+        long now = Time.currentElapsedTime();
+        long diff = now - lastTime;
+
+        if (diff > fillTime) {
+            int refill = (int)(diff * fillCount / fillTime);
+            tokens = Math.min(tokens + refill, maxTokens);
+            lastTime = now;
+        }
+
+        // A freeze time of -1 disables BLUE randomized throttling
+        if(freezeTime != -1) {
+            if(!checkBlue(now)) {
+                return false;
+            }
+        }
+
+        if (tokens < need) {
+            return false;
+        }
+
+        tokens -= need;
+        return true;
+    }
+
+    public synchronized boolean checkBlue(long now) {
+        int length = maxTokens - tokens;
+        int limit = maxTokens;
+        long diff = now - lastFreeze;
+        long threshold = Math.round(maxTokens * decreasePoint);
+
+        if (diff > freezeTime) {
+            if((length == limit) && (drop < 1)) {
+                drop = Math.min(drop + dropIncrease, 1);
+            }
+            else if ((length <= threshold) && (drop > 0)) {
+                drop = Math.max(drop - dropDecrease, 0);
+            }
+            lastFreeze = now;
+        }
+
+        if (rng.nextDouble() < drop) {
+            return false;
+        }
+        return true;
+    }
+};

+ 30 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+/**
+ * Indicates that the number of client connections has exceeded some limit.
+ * @see org.apache.zookeeper.server.ClientCnxnLimit#checkLimit()
+ * @see org.apache.zookeeper.server.ClientCnxnLimit#checkLimit(int)
+ */
+public class ClientCnxnLimitException extends Exception {
+    public ClientCnxnLimitException() {
+        super("Connection throttle rejected connection");
+    }
+}

+ 10 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -151,7 +151,7 @@ public class NIOServerCnxn extends ServerCnxn {
     }
 
     /** Read the request payload (everything following the length prefix) */
-    private void readPayload() throws IOException, InterruptedException {
+    private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
         if (incomingBuffer.remaining() != 0) { // have we read length bytes?
             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
             if (rc < 0) {
@@ -360,6 +360,14 @@ public class NIOServerCnxn extends ServerCnxn {
             LOG.warn(e.getMessage());
             // expecting close to log session closure
             close();
+        } catch (ClientCnxnLimitException e) {
+            // Common case exception, print at debug level
+            ServerMetrics.CONNECTION_REJECTED.add(1);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Exception causing close of session 0x"
+                          + Long.toHexString(sessionId) + ": " + e.getMessage());
+            }
+            close();
         } catch (IOException e) {
             LOG.warn("Exception causing close of session 0x"
                      + Long.toHexString(sessionId) + ": " + e.getMessage());
@@ -407,7 +415,7 @@ public class NIOServerCnxn extends ServerCnxn {
         }
     }
 
-    private void readConnectRequest() throws IOException, InterruptedException {
+    private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
         if (!isZKServerRunning()) {
             throw new IOException("ZooKeeperServer not running");
         }

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java

@@ -310,6 +310,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory {
                 acceptErrorLogger.flush();
             } catch (IOException e) {
                 // accept, maxClientCnxns, configureBlocking
+                ServerMetrics.CONNECTION_REJECTED.add(1);
                 acceptErrorLogger.rateLimitLog(
                     "Error accepting new connection: " + e.getMessage());
                 fastCloseSock(sc);

+ 7 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -486,6 +486,13 @@ public class NettyServerCnxn extends ServerCnxn {
         } catch(IOException e) {
             LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
             close();
+        } catch(ClientCnxnLimitException e) {
+            // Common case exception, print at debug level
+            ServerMetrics.CONNECTION_REJECTED.add(1);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Closing connection to " + getRemoteSocketAddress(), e);
+            }
+            close();
         }
     }
 

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java

@@ -108,6 +108,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
             InetAddress addr = ((InetSocketAddress) channel.remoteAddress())
                     .getAddress();
             if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
+                ServerMetrics.CONNECTION_REJECTED.add(1);
                 LOG.warn("Too many connections from {} - max is {}", addr,
                         maxClientCnxns);
                 channel.close();

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -67,6 +67,10 @@ public enum ServerMetrics {
     SNAP_COUNT(new SimpleCounter("snap_count")),
     COMMIT_COUNT(new SimpleCounter("commit_count")),
     CONNECTION_REQUEST_COUNT(new SimpleCounter("connection_request_count")),
+    // Connection throttling related
+    CONNECTION_TOKEN_DEFICIT(new AvgMinMaxCounter("connection_token_deficit")),
+    CONNECTION_REJECTED(new SimpleCounter("connection_rejected")),
+
     BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count")),
 
     RESPONSE_PACKET_CACHE_HITS(new SimpleCounter("response_packet_cache_hits")),

+ 23 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -164,6 +164,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         LOG.info(INT_BUFFER_STARTING_SIZE_BYTES + " = " + intBufferStartingSizeBytes);
     }
 
+    // Connection throttling
+    private BlueThrottle connThrottle;
+
     void removeCnxn(ServerCnxn cnxn) {
         zkDb.removeCnxn(cnxn);
     }
@@ -196,7 +199,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         setMinSessionTimeout(minSessionTimeout);
         setMaxSessionTimeout(maxSessionTimeout);
         listener = new ZooKeeperServerListenerImpl(this);
+
         readResponseCache = new ResponseCache();
+
+        connThrottle = new BlueThrottle();
+
         LOG.info("Created server with tickTime " + tickTime
                 + " minSessionTimeout " + getMinSessionTimeout()
                 + " maxSessionTimeout " + getMaxSessionTimeout()
@@ -219,6 +226,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return serverStats;
     }
 
+    public BlueThrottle connThrottle() {
+        return connThrottle;
+    }
+
     public void dumpConf(PrintWriter pwriter) {
         pwriter.print("clientPort=");
         pwriter.println(getClientPort());
@@ -1043,7 +1054,18 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return zkDb.getEphemerals();
     }
 
-    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
+    public double getConnectionDropChance() {
+        return connThrottle.getDropChance();
+    }
+
+    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
+        throws IOException, ClientCnxnLimitException {
+
+        if (connThrottle.checkLimit(1) == false) {
+            throw new ClientCnxnLimitException();
+        }
+        ServerMetrics.CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+
         BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
         ConnectRequest connReq = new ConnectRequest();
         connReq.deserialize(bia, "connect");

+ 72 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java

@@ -167,7 +167,7 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
     public String getSecureClientAddress() {
         if (zks.secureServerCnxnFactory != null) {
             return String.format("%s:%d", zks.secureServerCnxnFactory
-                    .getLocalAddress().getHostString(),
+                            .getLocalAddress().getHostString(),
                     zks.secureServerCnxnFactory.getLocalPort());
         }
         return "";
@@ -207,4 +207,75 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
     public void setResponseCachingEnabled(boolean isEnabled) {
         zks.setResponseCachingEnabled(isEnabled);
     }
+    
+    // Connection throttling settings
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getConnectionMaxTokens() {
+        return zks.connThrottle().getMaxTokens();
+    }
+
+    public void setConnectionMaxTokens(int val) {
+        zks.connThrottle().setMaxTokens(val);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getConnectionTokenFillTime() {
+        return zks.connThrottle().getFillTime();
+    }
+
+    public void setConnectionTokenFillTime(int val) {
+        zks.connThrottle().setFillTime(val);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getConnectionTokenFillCount() {
+        return zks.connThrottle().getFillCount();
+    }
+
+    public void setConnectionTokenFillCount(int val) {
+        zks.connThrottle().setFillCount(val);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getConnectionFreezeTime() {
+        return zks.connThrottle().getFreezeTime();
+    }
+
+    public void setConnectionFreezeTime(int val) {
+        zks.connThrottle().setFreezeTime(val);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public double getConnectionDropIncrease() {
+        return zks.connThrottle().getDropIncrease();
+    }
+
+    public void setConnectionDropIncrease(double val) {
+        zks.connThrottle().setDropIncrease(val);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public double getConnectionDropDecrease() {
+        return zks.connThrottle().getDropDecrease();
+    }
+
+    public void setConnectionDropDecrease(double val) {
+        zks.connThrottle().setDropDecrease(val);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public double getConnectionDecreaseRatio() {
+        return zks.connThrottle().getDecreasePoint();
+    }
+
+    public void setConnectionDecreaseRatio(double val) {
+        zks.connThrottle().setDecreasePoint(val);
+    }
 }

+ 22 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java

@@ -98,6 +98,28 @@ public interface ZooKeeperServerMXBean {
     public boolean getResponseCachingEnabled();
     public void setResponseCachingEnabled(boolean isEnabled);
 
+    /* Connection throttling settings */
+    public int getConnectionMaxTokens();
+    public void setConnectionMaxTokens(int val);
+
+    public int getConnectionTokenFillTime();
+    public void setConnectionTokenFillTime(int val);
+
+    public int getConnectionTokenFillCount();
+    public void setConnectionTokenFillCount(int val);
+
+    public int getConnectionFreezeTime();
+    public void setConnectionFreezeTime(int val);
+
+    public double getConnectionDropIncrease();
+    public void setConnectionDropIncrease(double val);
+
+    public double getConnectionDropDecrease();
+    public void setConnectionDropDecrease(double val);
+
+    public double getConnectionDecreaseRatio();
+    public void setConnectionDecreaseRatio(double val);
+
     /**
      * Reset packet and latency statistics 
      */

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java

@@ -354,6 +354,7 @@ public class Commands {
             OSMXBean osMbean = new OSMXBean();
             response.put("open_file_descriptor_count", osMbean.getOpenFileDescriptorCount());
             response.put("max_file_descriptor_count", osMbean.getMaxFileDescriptorCount());
+            response.put("connection_drop_probability", zkServer.getConnectionDropChance());
 
             response.put("last_client_response_size", stats.getClientResponseStats().getLastBufferSize());
             response.put("max_client_response_size", stats.getClientResponseStats().getMaxBufferSize());

+ 157 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java

@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class BlueThrottleTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class);
+
+    class MockRandom extends Random {
+        int flag = 0;
+        BlueThrottle throttle;
+
+        @Override
+        public double nextDouble() {
+            if (throttle.getDropChance() > 0) {
+                flag = 1 - flag;
+                return flag;
+            } else {
+                return 1;
+            }
+        }
+    }
+
+    class BlueThrottleWithMockRandom extends BlueThrottle {
+        public BlueThrottleWithMockRandom(MockRandom random) {
+            super();
+            this.rng = random;
+            random.throttle = this;
+        }
+    }
+
+    @Test
+    public void testThrottleDisabled() {
+        BlueThrottle throttler = new BlueThrottle();
+        Assert.assertTrue("Throttle should be disabled by default", throttler.checkLimit(1));
+    }
+
+    @Test
+    public void testThrottleWithoutRefill() {
+        BlueThrottle throttler = new BlueThrottle();
+        throttler.setMaxTokens(1);
+        throttler.setFillTime(2000);
+        Assert.assertTrue("First request should be allowed", throttler.checkLimit(1));
+        Assert.assertFalse("Second request should be denied", throttler.checkLimit(1));
+    }
+
+    @Test
+    public void testThrottleWithRefill() throws InterruptedException {
+        BlueThrottle throttler = new BlueThrottle();
+        throttler.setMaxTokens(1);
+        throttler.setFillTime(500);
+        Assert.assertTrue("First request should be allowed", throttler.checkLimit(1));
+        Assert.assertFalse("Second request should be denied", throttler.checkLimit(1));
+
+        //wait for the bucket to be refilled
+        Thread.sleep(750);
+        Assert.assertTrue("Third request should be allowed since we've got a new token", throttler.checkLimit(1));
+    }
+
+    @Test
+    public void testThrottleWithoutRandomDropping() throws InterruptedException {
+        int maxTokens = 5;
+        BlueThrottle throttler = new BlueThrottleWithMockRandom(new MockRandom());
+        throttler.setMaxTokens(maxTokens);
+        throttler.setFillCount(maxTokens);
+        throttler.setFillTime(1000);
+
+        for (int i=0;i<maxTokens;i++) {
+            throttler.checkLimit(1);
+        }
+        Assert.assertEquals("All tokens should be used up by now", throttler.getMaxTokens(), throttler.getDeficit());
+
+        Thread.sleep(110);
+        throttler.checkLimit(1);
+        Assert.assertFalse("Dropping probability should still be zero", throttler.getDropChance()>0);
+
+        //allow bucket to be refilled
+        Thread.sleep(1500);
+
+        for (int i=0;i<maxTokens;i++) {
+            Assert.assertTrue("The first " + maxTokens + " requests should be allowed", throttler.checkLimit(1));
+        }
+
+        for (int i=0;i<maxTokens;i++) {
+            Assert.assertFalse("The latter " + maxTokens + " requests should be denied", throttler.checkLimit(1));
+        }
+    }
+
+    @Test
+    public void testThrottleWithRandomDropping() throws InterruptedException {
+        int maxTokens = 5;
+        BlueThrottle throttler = new BlueThrottleWithMockRandom(new MockRandom());
+        throttler.setMaxTokens(maxTokens);
+        throttler.setFillCount(maxTokens);
+        throttler.setFillTime(1000);
+        throttler.setFreezeTime(100);
+        throttler.setDropIncrease(0.5);
+
+        for (int i=0;i<maxTokens;i++)
+            throttler.checkLimit(1);
+        Assert.assertEquals("All tokens should be used up by now", throttler.getMaxTokens(), throttler.getDeficit());
+
+        Thread.sleep(120);
+        //this will trigger dropping probability being increased
+        throttler.checkLimit(1);
+        Assert.assertTrue("Dropping probability should be increased", throttler.getDropChance()>0);
+        LOG.info("Dropping probability is {}", throttler.getDropChance());
+
+        //allow bucket to be refilled
+        Thread.sleep(1100);
+        LOG.info("Bucket is refilled with {} tokens.", maxTokens);
+
+        int accepted = 0;
+        for (int i=0;i<maxTokens;i++) {
+            if (throttler.checkLimit(1)) {
+                accepted ++;
+            }
+        }
+
+        LOG.info("Send {} requests, {} are accepted", maxTokens, accepted);
+        Assert.assertTrue("The dropping should be distributed", accepted<maxTokens);
+
+        accepted = 0;
+
+        for (int i=0;i<maxTokens;i++) {
+            if (throttler.checkLimit(1)) {
+                accepted ++;
+            }
+        }
+
+        LOG.info("Send another {} requests, {} are accepted", maxTokens, accepted);
+        Assert.assertTrue("Later requests should have a chance", accepted > 0);
+    }
+}

+ 2 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java

@@ -190,7 +190,8 @@ public class CommandsTest extends ClientBase {
                 new Field("min_client_response_size", Integer.class),
                 new Field("uptime", Long.class),
                 new Field("global_sessions", Long.class),
-                new Field("local_sessions", Long.class)
+                new Field("local_sessions", Long.class),
+                new Field("connection_drop_probability", Double.class)
         ));
         for (String metric : ServerMetrics.getAllValues().keySet()) {
             if (metric.startsWith("avg_")) {