|
@@ -32,6 +32,7 @@ import java.nio.channels.ServerSocketChannel;
|
|
import java.nio.channels.SocketChannel;
|
|
import java.nio.channels.SocketChannel;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
@@ -84,21 +85,42 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
|
|
ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
|
|
|
|
|
|
HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
|
|
HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
|
|
|
|
+ HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new HashMap<InetAddress, Set<NIOServerCnxn>>( );
|
|
|
|
|
|
int outstandingLimit = 1;
|
|
int outstandingLimit = 1;
|
|
|
|
|
|
- /** Create the factory, startup(zks) must be called subsequently.
|
|
|
|
- * @param port listener port
|
|
|
|
|
|
+ int maxClientCnxns = 10;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Construct a new server connection factory which will accept an unlimited number
|
|
|
|
+ * of concurrent connections from each client (up to the file descriptor
|
|
|
|
+ * limits of the operating system). startup(zks) must be called subsequently.
|
|
|
|
+ * @param port
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public Factory(int port) throws IOException {
|
|
public Factory(int port) throws IOException {
|
|
|
|
+ this(port,0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Constructs a new server connection factory where the number of concurrent connections
|
|
|
|
+ * from a single IP address is limited to maxcc (or unlimited if 0).
|
|
|
|
+ * startup(zks) must be called subsequently.
|
|
|
|
+ * @param port - the port to listen on for connections.
|
|
|
|
+ * @param maxcc - the number of concurrent connections allowed from a single client.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ public Factory(int port, int maxcc) throws IOException {
|
|
super("NIOServerCxn.Factory:" + port);
|
|
super("NIOServerCxn.Factory:" + port);
|
|
setDaemon(true);
|
|
setDaemon(true);
|
|
|
|
+ maxClientCnxns = maxcc;
|
|
this.ss = ServerSocketChannel.open();
|
|
this.ss = ServerSocketChannel.open();
|
|
ss.socket().setReuseAddress(true);
|
|
ss.socket().setReuseAddress(true);
|
|
ss.socket().bind(new InetSocketAddress(port));
|
|
ss.socket().bind(new InetSocketAddress(port));
|
|
ss.configureBlocking(false);
|
|
ss.configureBlocking(false);
|
|
- ss.register(selector, SelectionKey.OP_ACCEPT);
|
|
|
|
|
|
+ ss.register(selector, SelectionKey.OP_ACCEPT);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -109,9 +131,8 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void startup(ZooKeeperServer zks)
|
|
|
|
- throws IOException, InterruptedException
|
|
|
|
- {
|
|
|
|
|
|
+ public void startup(ZooKeeperServer zks) throws IOException,
|
|
|
|
+ InterruptedException {
|
|
start();
|
|
start();
|
|
zks.startup();
|
|
zks.startup();
|
|
setZooKeeperServer(zks);
|
|
setZooKeeperServer(zks);
|
|
@@ -142,6 +163,15 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
private void addCnxn(NIOServerCnxn cnxn) {
|
|
private void addCnxn(NIOServerCnxn cnxn) {
|
|
synchronized (cnxns) {
|
|
synchronized (cnxns) {
|
|
cnxns.add(cnxn);
|
|
cnxns.add(cnxn);
|
|
|
|
+ synchronized (ipMap){
|
|
|
|
+ InetAddress addr = cnxn.sock.socket().getInetAddress();
|
|
|
|
+ Set<NIOServerCnxn> s = ipMap.get(addr);
|
|
|
|
+ if (s == null) {
|
|
|
|
+ s = new HashSet<NIOServerCnxn>();
|
|
|
|
+ }
|
|
|
|
+ s.add(cnxn);
|
|
|
|
+ ipMap.put(addr,s);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -150,7 +180,13 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
return new NIOServerCnxn(zks, sock, sk, this);
|
|
return new NIOServerCnxn(zks, sock, sk, this);
|
|
}
|
|
}
|
|
|
|
|
|
- public void run() {
|
|
|
|
|
|
+ private int getClientCnxnCount( InetAddress cl) {
|
|
|
|
+ Set<NIOServerCnxn> s = ipMap.get(cl);
|
|
|
|
+ if (s == null) return 0;
|
|
|
|
+ return s.size();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void run() {
|
|
while (!ss.socket().isClosed()) {
|
|
while (!ss.socket().isClosed()) {
|
|
try {
|
|
try {
|
|
selector.select(1000);
|
|
selector.select(1000);
|
|
@@ -164,13 +200,21 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
for (SelectionKey k : selectedList) {
|
|
for (SelectionKey k : selectedList) {
|
|
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
|
|
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
|
|
SocketChannel sc = ((ServerSocketChannel) k
|
|
SocketChannel sc = ((ServerSocketChannel) k
|
|
- .channel()).accept();
|
|
|
|
- sc.configureBlocking(false);
|
|
|
|
- SelectionKey sk = sc.register(selector,
|
|
|
|
- SelectionKey.OP_READ);
|
|
|
|
- NIOServerCnxn cnxn = createConnection(sc, sk);
|
|
|
|
- sk.attach(cnxn);
|
|
|
|
- addCnxn(cnxn);
|
|
|
|
|
|
+ .channel()).accept();
|
|
|
|
+ InetAddress ia = sc.socket().getInetAddress();
|
|
|
|
+ int cnxncount = getClientCnxnCount(ia);
|
|
|
|
+ if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
|
|
|
|
+ LOG.warn("Too many connections from " + ia
|
|
|
|
+ + " - max is " + maxClientCnxns );
|
|
|
|
+ sc.close();
|
|
|
|
+ } else {
|
|
|
|
+ sc.configureBlocking(false);
|
|
|
|
+ SelectionKey sk = sc.register(selector,
|
|
|
|
+ SelectionKey.OP_READ);
|
|
|
|
+ NIOServerCnxn cnxn = createConnection(sc, sk);
|
|
|
|
+ sk.attach(cnxn);
|
|
|
|
+ addCnxn(cnxn);
|
|
|
|
+ }
|
|
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
|
|
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
|
|
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
|
|
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
|
|
c.doIO(k);
|
|
c.doIO(k);
|
|
@@ -762,11 +806,16 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
LOG.warn("Failed to unregister with JMX", e);
|
|
LOG.warn("Failed to unregister with JMX", e);
|
|
}
|
|
}
|
|
jmxConnectionBean = null;
|
|
jmxConnectionBean = null;
|
|
-
|
|
|
|
|
|
+
|
|
if (closed) {
|
|
if (closed) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
closed = true;
|
|
closed = true;
|
|
|
|
+ synchronized (factory.ipMap)
|
|
|
|
+ {
|
|
|
|
+ Set<NIOServerCnxn> s = factory.ipMap.get(sock.socket().getInetAddress());
|
|
|
|
+ s.remove(this);
|
|
|
|
+ }
|
|
synchronized (factory.cnxns) {
|
|
synchronized (factory.cnxns) {
|
|
factory.cnxns.remove(this);
|
|
factory.cnxns.remove(this);
|
|
}
|
|
}
|