瀏覽代碼

ZOOKEEPER-2238: Support limiting the maximum number of connections/clients to a zookeeper server

Support limiting the maximum number of connections/clients to a zookeeper server

Author: sujithsimon22 <sujith.abraham.simon@huawei.com>

Reviewers: Justin Mao Ling <maoling199210191@sina.com>, Brian Nixon <enixon@apache.org>, Edward Ribeiro <edward.ribeiro@gmail.com>, Enrico Olivelli <eolivelli@apache.org>, Mohammad Arshad <arshad@apache.org>

Closes #1108 from sujithsimon22/ZOOKEEPER-2238
Sujith Simon 5 年之前
父節點
當前提交
df26792932

+ 9 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -667,6 +667,15 @@ property, when available, is noted below.
     recommended to set the value to N * **preAllocSize**
     where N >= 2.
 
+* *maxCnxns* :
+    (Java system property: **zookeeper.maxCnxns**)
+    Limits the total number of concurrent connections that can be made to a
+    zookeeper server (per client Port of each server ). This is used to prevent certain
+    classes of DoS attacks. The default is 0 and setting it to 0 entirely removes
+    the limit on total number of concurrent connections.  Accounting for the
+    number of connections for serverCnxnFactory and a secureServerCnxnFactory is done
+    separately, so a peer is allowed to host up to 2*maxCnxns provided they are of appropriate types.
+
 * *maxClientCnxns* :
     (No Java system property)
     Limits the number of concurrent connections (at the socket

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java

@@ -273,6 +273,9 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory {
             try {
                 sc = acceptSocket.accept();
                 accepted = true;
+                if (limitTotalNumberOfCnxns()) {
+                    throw new IOException("Too many connections max allowed is " + maxCnxns);
+                }
                 InetAddress ia = sc.socket().getInetAddress();
                 int cnxncount = getClientCnxnCount(ia);
 
@@ -634,6 +637,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory {
         configureSaslLogin();
 
         maxClientCnxns = maxcc;
+        initMaxCnxns();
         sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
         // We also use the sessionlessCnxnTimeout as expiring interval for
         // cnxnExpiryQueue. These don't need to be the same, but the expiring

+ 6 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java

@@ -186,6 +186,11 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
             }
 
             final Channel channel = ctx.channel();
+            if (limitTotalNumberOfCnxns()) {
+                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
+                channel.close();
+                return;
+            }
             InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
             if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
                 ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
@@ -524,6 +529,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     @Override
     public void configure(InetSocketAddress addr, int maxClientCnxns, int backlog, boolean secure) throws IOException {
         configureSaslLogin();
+        initMaxCnxns();
         localAddress = addr;
         this.maxClientCnxns = maxClientCnxns;
         this.secure = secure;

+ 41 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java

@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
 public abstract class ServerCnxnFactory {
 
     public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
+    private static final String ZOOKEEPER_MAX_CONNECTION = "zookeeper.maxCnxns";
+    public static final int ZOOKEEPER_MAX_CONNECTION_DEFAULT = 0;
 
     private static final Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class);
 
@@ -51,6 +53,9 @@ public abstract class ServerCnxnFactory {
      */
     static final ByteBuffer closeConn = ByteBuffer.allocate(0);
 
+    // total number of connections accepted by the ZooKeeper server
+    protected int maxCnxns;
+
     // sessionMap is used by closeSession()
     final ConcurrentHashMap<Long, ServerCnxn> sessionMap = new ConcurrentHashMap<Long, ServerCnxn>();
 
@@ -287,4 +292,40 @@ public abstract class ServerCnxnFactory {
         return loginUser;
     }
 
+    /**
+     * Maximum number of connections allowed in the ZooKeeper system
+     */
+    public int getMaxCnxns() {
+        return maxCnxns;
+    }
+
+    protected void initMaxCnxns() {
+        maxCnxns = Integer.getInteger(ZOOKEEPER_MAX_CONNECTION, ZOOKEEPER_MAX_CONNECTION_DEFAULT);
+        if (maxCnxns < 0) {
+            maxCnxns = ZOOKEEPER_MAX_CONNECTION_DEFAULT;
+            LOG.warn("maxCnxns should be greater than or equal to 0, using default vlaue {}.",
+                    ZOOKEEPER_MAX_CONNECTION_DEFAULT);
+        } else if (maxCnxns == ZOOKEEPER_MAX_CONNECTION_DEFAULT) {
+            LOG.warn("maxCnxns is not configured, using default value {}.",
+                    ZOOKEEPER_MAX_CONNECTION_DEFAULT);
+        } else {
+            LOG.info("maxCnxns configured value is {}.", maxCnxns);
+        }
+    }
+
+    /**
+     * Ensure total number of connections are less than the maxCnxns
+     */
+    protected boolean limitTotalNumberOfCnxns() {
+        if (maxCnxns <= 0) {
+            // maxCnxns limit is disabled
+            return false;
+        }
+        int cnxns = getNumAliveConnections();
+        if (cnxns >= maxCnxns) {
+            LOG.error("Too many connections " + cnxns + " - max is " + maxCnxns);
+            return true;
+        }
+        return false;
+    }
 }

+ 35 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnHelper.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+public class ServerCnxnHelper {
+
+    /** gets maximum number of connections in ZooKeeper */
+    public static int getMaxCnxns(ServerCnxnFactory secureServerCnxnFactory, ServerCnxnFactory serverCnxnFactory) {
+        if (serverCnxnFactory != null) {
+            return serverCnxnFactory.getMaxCnxns();
+        }
+        if (secureServerCnxnFactory != null) {
+            return secureServerCnxnFactory.getMaxCnxns();
+        }
+        // default
+        return ServerCnxnFactory.ZOOKEEPER_MAX_CONNECTION_DEFAULT;
+    }
+
+}

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java

@@ -403,4 +403,8 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
     public void setLargeRequestThreshold(int threshold) {
         zks.setLargeRequestThreshold(threshold);
     }
+
+    public int getMaxCnxns() {
+        return ServerCnxnHelper.getMaxCnxns(zks.secureServerCnxnFactory, zks.serverCnxnFactory);
+    }
 }

+ 5 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java

@@ -221,4 +221,9 @@ public interface ZooKeeperServerMXBean {
     int getMaxBatchSize();
     void setMaxBatchSize(int size);
 
+    /**
+     * @return Current maxCnxns allowed to a single ZooKeeper server
+     */
+   int getMaxCnxns();
+
 }

+ 5 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalPeerBean.java

@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server.quorum;
 
 import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
+import org.apache.zookeeper.server.ServerCnxnHelper;
 
 /**
  * Implementation of the local peer MBean interface.
@@ -122,4 +123,8 @@ public class LocalPeerBean extends ServerBean implements LocalPeerMXBean {
         return peer.isLeader(peer.getId());
     }
 
+    @Override
+    public int getMaxCnxns() {
+        return ServerCnxnHelper.getMaxCnxns(peer.secureCnxnFactory, peer.cnxnFactory);
+    }
 }

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalPeerMXBean.java

@@ -120,4 +120,8 @@ public interface LocalPeerMXBean extends ServerMXBean {
      */
     boolean isLeader();
 
+    /**
+     * @return Current maxCnxns allowed to a single ZooKeeper server
+     */
+    int getMaxCnxns();
 }

+ 177 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerMaxCnxnsTest.java

@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperServerMaxCnxnsTest extends QuorumPeerTestBase {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperServerMaxCnxnsTest.class);
+    private static int SERVER_COUNT = 3;
+    private MainThread[] mt;
+    private ZooKeeper[] clients;
+
+    /**
+     * <pre>
+     * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2238.
+     * Support limiting the maximum number of connections/clients to a ZooKeeper server.
+     * </pre>
+     */
+
+    @Test(timeout = 120000)
+    public void testMaxZooKeeperClientsWithNIOServerCnxnFactory() throws Exception {
+        String serverCnxnFactory = "org.apache.zookeeper.server.NIOServerCnxnFactory";
+        testMaxZooKeeperClients(serverCnxnFactory);
+    }
+
+    @Test(timeout = 120000)
+    public void testMaxZooKeeperClientsWithNettyServerCnxnFactory() throws Exception {
+        String serverCnxnFactory = "org.apache.zookeeper.server.NettyServerCnxnFactory";
+        testMaxZooKeeperClients(serverCnxnFactory);
+    }
+
+    private void testMaxZooKeeperClients(String serverCnxnFactory) throws Exception {
+        final int clientPorts[] = new int[SERVER_COUNT];
+        int maxCnxns = 2;
+        StringBuilder sb = new StringBuilder();
+        sb.append("maxCnxns=" + maxCnxns + "\n");
+        sb.append("serverCnxnFactory=" + serverCnxnFactory + "\n");
+        String server;
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":"
+                    + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i];
+            sb.append(server + "\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+
+        // start 3 servers
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
+            mt[i].start();
+        }
+
+        // ensure all servers started
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up", ClientBase
+                    .waitForServerUp("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT));
+        }
+
+        int maxAllowedConnection = maxCnxns * SERVER_COUNT;
+        String cxnString = getCxnString(clientPorts);
+
+        final CountDownLatch countDownLatch = new CountDownLatch(maxAllowedConnection);
+        ZooKeeper[] clients = new ZooKeeper[maxAllowedConnection];
+        Watcher watcher = new Watcher() {
+
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getState() == Event.KeeperState.SyncConnected) {
+                    countDownLatch.countDown();
+                }
+            }
+        };
+        for (int i = 0; i < maxAllowedConnection; i++) {
+            clients[i] = new ZooKeeper(cxnString, ClientBase.CONNECTION_TIMEOUT, watcher);
+            Thread.sleep(100);
+        }
+        countDownLatch.await();
+        // reaching this point indicates that all maxAllowedConnection connected
+
+        // No more client to be allowed to connect now as we have reached the
+        // max connections
+        CountdownWatcher cdw = new CountdownWatcher();
+        ZooKeeper extraClient = new ZooKeeper(cxnString, ClientBase.CONNECTION_TIMEOUT, cdw);
+        try {
+            cdw.waitForConnected(ClientBase.CONNECTION_TIMEOUT / 2);
+            fail("Client is not supposed to get connected as max connection already reached.");
+        } catch (TimeoutException e) {
+            extraClient.close();
+        }
+
+        // lets close one already connected client
+        clients[0].close();
+
+        // Now extra client must automatically get connected
+        cdw = new CountdownWatcher();
+        extraClient = new ZooKeeper(cxnString, ClientBase.CONNECTION_TIMEOUT, cdw);
+        cdw.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+        // verify some basic operation
+        String create = extraClient.create("/test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        assertEquals("/test", create);
+
+        // cleanup
+        extraClient.close();
+    }
+
+    private String getCxnString(int[] clientPorts) {
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < clientPorts.length; i++) {
+            builder.append("127.0.0.1:" + clientPorts[i]);
+            if (i != clientPorts.length - 1) {
+                builder.append(",");
+            }
+        }
+        return builder.toString();
+    }
+
+    @After
+    public void tearDown() {
+        // stop all clients
+        if (clients != null) {
+            for (ZooKeeper zooKeeper : clients) {
+                try {
+                    zooKeeper.close();
+                } catch (InterruptedException e) {
+                    LOG.warn("ZooKeeper interrupted while closing it.", e);
+                }
+            }
+        }
+        // stop all severs
+        if (mt != null) {
+            for (int i = 0; i < SERVER_COUNT; i++) {
+                try {
+                    mt[i].shutdown();
+                } catch (InterruptedException e) {
+                    LOG.warn("Quorum Peer interrupted while shutting it down", e);
+                }
+            }
+        }
+    }
+}