소스 검색

HADOOP-9137. Support connection limiting in IPC server. Contributed by Kihwal Lee.

Kihwal Lee 10 년 전
부모
커밋
8dc59cb9e0

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -519,6 +519,8 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11441. Hadoop-azure: Change few methods scope to public.
     (Shashank Khandelwal via cnauroth)
 
+    HADOOP-9137. Support connection limiting in IPC server (kihwal)
+
   OPTIMIZATIONS
 
     HADOOP-11323. WritableComparator#compare keeps reference to byte array.

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -227,6 +227,11 @@ public class CommonConfigurationKeysPublic {
     "ipc.server.tcpnodelay";
   /** Default value for IPC_SERVER_TCPNODELAY_KEY */
   public static final boolean IPC_SERVER_TCPNODELAY_DEFAULT = true;
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final String  IPC_SERVER_MAX_CONNECTIONS_KEY =
+    "ipc.server.max.connections";
+  /** Default value for IPC_SERVER_MAX_CONNECTIONS_KEY */
+  public static final int     IPC_SERVER_MAX_CONNECTIONS_DEFAULT = 0;
 
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
   public static final String  HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY =

+ 19 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -749,6 +749,13 @@ public abstract class Server {
         
         Reader reader = getReader();
         Connection c = connectionManager.register(channel);
+        // If the connectionManager can't take it, close the connection.
+        if (c == null) {
+          if (channel.isOpen()) {
+            IOUtils.cleanup(null, channel);
+          }
+          continue;
+        }
         key.attach(c);  // so closeCurrentConnection can get the object
         reader.addConnection(c);
       }
@@ -2731,6 +2738,7 @@ public abstract class Server {
     final private int idleScanInterval;
     final private int maxIdleTime;
     final private int maxIdleToClose;
+    final private int maxConnections;
     
     ConnectionManager() {
       this.idleScanTimer = new Timer(
@@ -2747,6 +2755,9 @@ public abstract class Server {
       this.maxIdleToClose = conf.getInt(
           CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
           CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
+      this.maxConnections = conf.getInt(
+          CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_KEY,
+          CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_DEFAULT);
       // create a set with concurrency -and- a thread-safe iterator, add 2
       // for listener and idle closer threads
       this.connections = Collections.newSetFromMap(
@@ -2774,11 +2785,19 @@ public abstract class Server {
       return count.get();
     }
 
+    boolean isFull() {
+      // The check is disabled when maxConnections <= 0.
+      return ((maxConnections > 0) && (size() >= maxConnections));
+    }
+
     Connection[] toArray() {
       return connections.toArray(new Connection[0]);
     }
 
     Connection register(SocketChannel channel) {
+      if (isFull()) {
+        return null;
+      }
       Connection connection = new Connection(channel, Time.now());
       add(connection);
       if (LOG.isDebugEnabled()) {

+ 11 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1677,4 +1677,15 @@ for ldap providers in the same way as above does.
   </description>
 </property>
 
+ <property>
+  <name>ipc.server.max.connections</name>
+  <value>0</value>
+  <description>The maximum number of concurrent connections a server is allowed
+    to accept. If this limit is exceeded, incoming connections will first fill
+    the listen queue and then may go to an OS-specific listen overflow queue. 
+    The client may fail or timeout, but the server can avoid running out of file
+    descriptors using this feature. 0 means no limit.
+  </description>
+</property>
+
 </configuration>

+ 51 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

@@ -1184,6 +1184,57 @@ public class TestIPC {
     }
   }
 
+  @Test
+  public void testMaxConnections() throws Exception {
+    conf.setInt("ipc.server.max.connections", 5);
+    Server server = null;
+    Thread connectors[] = new Thread[10];
+
+    try {
+      server = new TestServer(3, false);
+      final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      server.start();
+      assertEquals(0, server.getNumOpenConnections());
+
+      for (int i = 0; i < 10; i++) {
+        connectors[i] = new Thread() {
+          @Override
+          public void run() {
+            Socket sock = null;
+            try {
+              sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+              NetUtils.connect(sock, addr, 3000);
+              try {
+                Thread.sleep(4000);
+              } catch (InterruptedException ie) { }
+            } catch (IOException ioe) {
+            } finally {
+              if (sock != null) {
+                try {
+                  sock.close();
+                } catch (IOException ioe) { }
+              }
+            }
+          }
+        };
+        connectors[i].start();
+      }
+
+      Thread.sleep(1000);
+      // server should only accept up to 5 connections
+      assertEquals(5, server.getNumOpenConnections());
+
+      for (int i = 0; i < 10; i++) {
+        connectors[i].join();
+      }
+    } finally {
+      if (server != null) {
+        server.stop();
+      }
+      conf.setInt("ipc.server.max.connections", 0);
+    }
+  }
+
   private void assertRetriesOnSocketTimeouts(Configuration conf,
       int maxTimeoutRetries) throws IOException {
     SocketFactory mockFactory = Mockito.mock(SocketFactory.class);