|
@@ -51,11 +51,13 @@ import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
+import java.util.Timer;
|
|
|
+import java.util.TimerTask;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import javax.security.sasl.Sasl;
|
|
|
import javax.security.sasl.SaslException;
|
|
@@ -345,17 +347,8 @@ public abstract class Server {
|
|
|
private int port; // port we listen on
|
|
|
private int handlerCount; // number of handler threads
|
|
|
private int readThreads; // number of read threads
|
|
|
+ private int readerPendingConnectionQueue; // number of connections to queue per read thread
|
|
|
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
|
|
|
- private int maxIdleTime; // the maximum idle time after
|
|
|
- // which a client may be disconnected
|
|
|
- private int thresholdIdleConnections; // the number of idle connections
|
|
|
- // after which we will start
|
|
|
- // cleaning up idle
|
|
|
- // connections
|
|
|
- int maxConnectionsToNuke; // the max number of
|
|
|
- // connections to nuke
|
|
|
- //during a cleanup
|
|
|
-
|
|
|
protected RpcMetrics rpcMetrics;
|
|
|
protected RpcDetailedMetrics rpcDetailedMetrics;
|
|
|
|
|
@@ -373,13 +366,10 @@ public abstract class Server {
|
|
|
volatile private boolean running = true; // true while server runs
|
|
|
private BlockingQueue<Call> callQueue; // queued calls
|
|
|
|
|
|
- private List<Connection> connectionList =
|
|
|
- Collections.synchronizedList(new LinkedList<Connection>());
|
|
|
- //maintain a list
|
|
|
- //of client connections
|
|
|
+ // maintains the set of client connections and handles idle timeouts
|
|
|
+ private ConnectionManager connectionManager;
|
|
|
private Listener listener = null;
|
|
|
private Responder responder = null;
|
|
|
- private int numConnections = 0;
|
|
|
private Handler[] handlers = null;
|
|
|
|
|
|
/**
|
|
@@ -449,8 +439,8 @@ public abstract class Server {
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- List<Connection> getConnections() {
|
|
|
- return connectionList;
|
|
|
+ Connection[] getConnections() {
|
|
|
+ return connectionManager.toArray();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -518,11 +508,6 @@ public abstract class Server {
|
|
|
private Reader[] readers = null;
|
|
|
private int currentReader = 0;
|
|
|
private InetSocketAddress address; //the address we bind at
|
|
|
- private Random rand = new Random();
|
|
|
- private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
|
|
|
- //-tion (for idle connections) ran
|
|
|
- private long cleanupInterval = 10000; //the minimum interval between
|
|
|
- //two cleanup runs
|
|
|
private int backlogLength = conf.getInt(
|
|
|
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
|
|
|
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
|
|
@@ -553,12 +538,14 @@ public abstract class Server {
|
|
|
}
|
|
|
|
|
|
private class Reader extends Thread {
|
|
|
- private volatile boolean adding = false;
|
|
|
+ final private BlockingQueue<Connection> pendingConnections;
|
|
|
private final Selector readSelector;
|
|
|
|
|
|
Reader(String name) throws IOException {
|
|
|
super(name);
|
|
|
|
|
|
+ this.pendingConnections =
|
|
|
+ new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
|
|
|
this.readSelector = Selector.open();
|
|
|
}
|
|
|
|
|
@@ -580,10 +567,14 @@ public abstract class Server {
|
|
|
while (running) {
|
|
|
SelectionKey key = null;
|
|
|
try {
|
|
|
+ // consume as many connections as currently queued to avoid
|
|
|
+ // unbridled acceptance of connections that starves the select
|
|
|
+ int size = pendingConnections.size();
|
|
|
+ for (int i=size; i>0; i--) {
|
|
|
+ Connection conn = pendingConnections.take();
|
|
|
+ conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
|
|
|
+ }
|
|
|
readSelector.select();
|
|
|
- while (adding) {
|
|
|
- this.wait(1000);
|
|
|
- }
|
|
|
|
|
|
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
|
|
|
while (iter.hasNext()) {
|
|
@@ -607,26 +598,14 @@ public abstract class Server {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This gets reader into the state that waits for the new channel
|
|
|
- * to be registered with readSelector. If it was waiting in select()
|
|
|
- * the thread will be woken up, otherwise whenever select() is called
|
|
|
- * it will return even if there is nothing to read and wait
|
|
|
- * in while(adding) for finishAdd call
|
|
|
+ * Updating the readSelector while it's being used is not thread-safe,
|
|
|
+ * so the connection must be queued. The reader will drain the queue
|
|
|
+ * and update its readSelector before performing the next select
|
|
|
*/
|
|
|
- public void startAdd() {
|
|
|
- adding = true;
|
|
|
+ public void addConnection(Connection conn) throws InterruptedException {
|
|
|
+ pendingConnections.put(conn);
|
|
|
readSelector.wakeup();
|
|
|
}
|
|
|
-
|
|
|
- public synchronized SelectionKey registerChannel(SocketChannel channel)
|
|
|
- throws IOException {
|
|
|
- return channel.register(readSelector, SelectionKey.OP_READ);
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized void finishAdd() {
|
|
|
- adding = false;
|
|
|
- this.notify();
|
|
|
- }
|
|
|
|
|
|
void shutdown() {
|
|
|
assert !running;
|
|
@@ -638,58 +617,12 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- /** cleanup connections from connectionList. Choose a random range
|
|
|
- * to scan and also have a limit on the number of the connections
|
|
|
- * that will be cleanedup per run. The criteria for cleanup is the time
|
|
|
- * for which the connection was idle. If 'force' is true then all
|
|
|
- * connections will be looked at for the cleanup.
|
|
|
- */
|
|
|
- private void cleanupConnections(boolean force) {
|
|
|
- if (force || numConnections > thresholdIdleConnections) {
|
|
|
- long currentTime = Time.now();
|
|
|
- if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
|
|
|
- return;
|
|
|
- }
|
|
|
- int start = 0;
|
|
|
- int end = numConnections - 1;
|
|
|
- if (!force) {
|
|
|
- start = rand.nextInt() % numConnections;
|
|
|
- end = rand.nextInt() % numConnections;
|
|
|
- int temp;
|
|
|
- if (end < start) {
|
|
|
- temp = start;
|
|
|
- start = end;
|
|
|
- end = temp;
|
|
|
- }
|
|
|
- }
|
|
|
- int i = start;
|
|
|
- int numNuked = 0;
|
|
|
- while (i <= end) {
|
|
|
- Connection c;
|
|
|
- synchronized (connectionList) {
|
|
|
- try {
|
|
|
- c = connectionList.get(i);
|
|
|
- } catch (Exception e) {return;}
|
|
|
- }
|
|
|
- if (c.timedOut(currentTime)) {
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
- closeConnection(c);
|
|
|
- numNuked++;
|
|
|
- end--;
|
|
|
- c = null;
|
|
|
- if (!force && numNuked == maxConnectionsToNuke) break;
|
|
|
- }
|
|
|
- else i++;
|
|
|
- }
|
|
|
- lastCleanupRunTime = Time.now();
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
LOG.info(getName() + ": starting");
|
|
|
SERVER.set(Server.this);
|
|
|
+ connectionManager.startIdleScan();
|
|
|
while (running) {
|
|
|
SelectionKey key = null;
|
|
|
try {
|
|
@@ -713,12 +646,11 @@ public abstract class Server {
|
|
|
// some thread(s) a chance to finish
|
|
|
LOG.warn("Out of Memory in server select", e);
|
|
|
closeCurrentConnection(key, e);
|
|
|
- cleanupConnections(true);
|
|
|
+ connectionManager.closeIdle(true);
|
|
|
try { Thread.sleep(60000); } catch (Exception ie) {}
|
|
|
} catch (Exception e) {
|
|
|
closeCurrentConnection(key, e);
|
|
|
}
|
|
|
- cleanupConnections(false);
|
|
|
}
|
|
|
LOG.info("Stopping " + this.getName());
|
|
|
|
|
@@ -731,10 +663,9 @@ public abstract class Server {
|
|
|
selector= null;
|
|
|
acceptChannel= null;
|
|
|
|
|
|
- // clean up all connections
|
|
|
- while (!connectionList.isEmpty()) {
|
|
|
- closeConnection(connectionList.remove(0));
|
|
|
- }
|
|
|
+ // close all connections
|
|
|
+ connectionManager.stopIdleScan();
|
|
|
+ connectionManager.closeAll();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -742,8 +673,6 @@ public abstract class Server {
|
|
|
if (key != null) {
|
|
|
Connection c = (Connection)key.attachment();
|
|
|
if (c != null) {
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
closeConnection(c);
|
|
|
c = null;
|
|
|
}
|
|
@@ -754,8 +683,7 @@ public abstract class Server {
|
|
|
return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
|
|
|
}
|
|
|
|
|
|
- void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
|
|
- Connection c = null;
|
|
|
+ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
|
|
|
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
|
|
SocketChannel channel;
|
|
|
while ((channel = server.accept()) != null) {
|
|
@@ -765,22 +693,9 @@ public abstract class Server {
|
|
|
channel.socket().setKeepAlive(true);
|
|
|
|
|
|
Reader reader = getReader();
|
|
|
- try {
|
|
|
- reader.startAdd();
|
|
|
- SelectionKey readKey = reader.registerChannel(channel);
|
|
|
- c = new Connection(readKey, channel, Time.now());
|
|
|
- readKey.attach(c);
|
|
|
- synchronized (connectionList) {
|
|
|
- connectionList.add(numConnections, c);
|
|
|
- numConnections++;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Server connection from " + c.toString() +
|
|
|
- "; # active connections: " + numConnections +
|
|
|
- "; # queued calls: " + callQueue.size());
|
|
|
- } finally {
|
|
|
- reader.finishAdd();
|
|
|
- }
|
|
|
+ Connection c = connectionManager.register(channel);
|
|
|
+ key.attach(c); // so closeCurrentConnection can get the object
|
|
|
+ reader.addConnection(c);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -808,10 +723,6 @@ public abstract class Server {
|
|
|
count = -1; //so that the (count < 0) block is executed
|
|
|
}
|
|
|
if (count < 0) {
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(getName() + ": disconnecting client " +
|
|
|
- c + ". Number of active connections: "+
|
|
|
- numConnections);
|
|
|
closeConnection(c);
|
|
|
c = null;
|
|
|
}
|
|
@@ -1190,8 +1101,7 @@ public abstract class Server {
|
|
|
private boolean sentNegotiate = false;
|
|
|
private boolean useWrap = false;
|
|
|
|
|
|
- public Connection(SelectionKey key, SocketChannel channel,
|
|
|
- long lastContact) {
|
|
|
+ public Connection(SocketChannel channel, long lastContact) {
|
|
|
this.channel = channel;
|
|
|
this.lastContact = lastContact;
|
|
|
this.data = null;
|
|
@@ -1253,12 +1163,6 @@ public abstract class Server {
|
|
|
rpcCount++;
|
|
|
}
|
|
|
|
|
|
- private boolean timedOut(long currentTime) {
|
|
|
- if (isIdle() && currentTime - lastContact > maxIdleTime)
|
|
|
- return true;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
private UserGroupInformation getAuthorizedUgi(String authorizedId)
|
|
|
throws InvalidToken, AccessControlException {
|
|
|
if (authMethod == AuthMethod.TOKEN) {
|
|
@@ -2189,16 +2093,10 @@ public abstract class Server {
|
|
|
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
|
|
|
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
|
|
|
}
|
|
|
+ this.readerPendingConnectionQueue = conf.getInt(
|
|
|
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
|
|
|
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
|
|
|
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
|
|
|
- this.maxIdleTime = 2 * conf.getInt(
|
|
|
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
|
|
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
|
|
|
- this.maxConnectionsToNuke = conf.getInt(
|
|
|
- CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
|
|
|
- CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
|
|
|
- this.thresholdIdleConnections = conf.getInt(
|
|
|
- CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
|
|
|
- CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
|
|
|
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
|
|
|
this.authorize =
|
|
|
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
|
|
@@ -2219,6 +2117,7 @@ public abstract class Server {
|
|
|
|
|
|
// Create the responder here
|
|
|
responder = new Responder();
|
|
|
+ connectionManager = new ConnectionManager();
|
|
|
|
|
|
if (secretManager != null) {
|
|
|
SaslRpcServer.init(conf);
|
|
@@ -2277,11 +2176,7 @@ public abstract class Server {
|
|
|
}
|
|
|
|
|
|
private void closeConnection(Connection connection) {
|
|
|
- synchronized (connectionList) {
|
|
|
- if (connectionList.remove(connection))
|
|
|
- numConnections--;
|
|
|
- }
|
|
|
- connection.close();
|
|
|
+ connectionManager.close(connection);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2536,7 +2431,7 @@ public abstract class Server {
|
|
|
* @return the number of open rpc connections
|
|
|
*/
|
|
|
public int getNumOpenConnections() {
|
|
|
- return numConnections;
|
|
|
+ return connectionManager.size();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2646,4 +2541,151 @@ public abstract class Server {
|
|
|
int nBytes = initialRemaining - buf.remaining();
|
|
|
return (nBytes > 0) ? nBytes : ret;
|
|
|
}
|
|
|
+
|
|
|
+ private class ConnectionManager {
|
|
|
+ final private AtomicInteger count = new AtomicInteger();
|
|
|
+ final private Set<Connection> connections;
|
|
|
+
|
|
|
+ final private Timer idleScanTimer;
|
|
|
+ final private int idleScanThreshold;
|
|
|
+ final private int idleScanInterval;
|
|
|
+ final private int maxIdleTime;
|
|
|
+ final private int maxIdleToClose;
|
|
|
+
|
|
|
+ ConnectionManager() {
|
|
|
+ this.idleScanTimer = new Timer(
|
|
|
+ "IPC Server idle connection scanner for port " + getPort(), true);
|
|
|
+ this.idleScanThreshold = conf.getInt(
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
|
|
|
+ this.idleScanInterval = conf.getInt(
|
|
|
+ CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY,
|
|
|
+ CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT);
|
|
|
+ this.maxIdleTime = 2 * conf.getInt(
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
|
|
|
+ this.maxIdleToClose = conf.getInt(
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
|
|
|
+ // create a set with concurrency -and- a thread-safe iterator, add 2
|
|
|
+ // for listener and idle closer threads
|
|
|
+ this.connections = Collections.newSetFromMap(
|
|
|
+ new ConcurrentHashMap<Connection,Boolean>(
|
|
|
+ maxQueueSize, 0.75f, readThreads+2));
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean add(Connection connection) {
|
|
|
+ boolean added = connections.add(connection);
|
|
|
+ if (added) {
|
|
|
+ count.getAndIncrement();
|
|
|
+ }
|
|
|
+ return added;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean remove(Connection connection) {
|
|
|
+ boolean removed = connections.remove(connection);
|
|
|
+ if (removed) {
|
|
|
+ count.getAndDecrement();
|
|
|
+ }
|
|
|
+ return removed;
|
|
|
+ }
|
|
|
+
|
|
|
+ int size() {
|
|
|
+ return count.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ Connection[] toArray() {
|
|
|
+ return connections.toArray(new Connection[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ Connection register(SocketChannel channel) {
|
|
|
+ Connection connection = new Connection(channel, Time.now());
|
|
|
+ add(connection);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Server connection from " + connection +
|
|
|
+ "; # active connections: " + size() +
|
|
|
+ "; # queued calls: " + callQueue.size());
|
|
|
+ }
|
|
|
+ return connection;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean close(Connection connection) {
|
|
|
+ boolean exists = remove(connection);
|
|
|
+ if (exists) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(Thread.currentThread().getName() +
|
|
|
+ ": disconnecting client " + connection +
|
|
|
+ ". Number of active connections: "+ size());
|
|
|
+ }
|
|
|
+ // only close if actually removed to avoid double-closing due
|
|
|
+ // to possible races
|
|
|
+ connection.close();
|
|
|
+ }
|
|
|
+ return exists;
|
|
|
+ }
|
|
|
+
|
|
|
+ // synch'ed to avoid explicit invocation upon OOM from colliding with
|
|
|
+ // timer task firing
|
|
|
+ synchronized void closeIdle(boolean scanAll) {
|
|
|
+ long minLastContact = Time.now() - maxIdleTime;
|
|
|
+ // concurrent iterator might miss new connections added
|
|
|
+ // during the iteration, but that's ok because they won't
|
|
|
+ // be idle yet anyway and will be caught on next scan
|
|
|
+ int closed = 0;
|
|
|
+ for (Connection connection : connections) {
|
|
|
+ // stop if connections dropped below threshold unless scanning all
|
|
|
+ if (!scanAll && size() < idleScanThreshold) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ // stop if not scanning all and max connections are closed
|
|
|
+ if (connection.isIdle() &&
|
|
|
+ connection.getLastContact() < minLastContact &&
|
|
|
+ close(connection) &&
|
|
|
+ !scanAll && (++closed == maxIdleToClose)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void closeAll() {
|
|
|
+ // use a copy of the connections to be absolutely sure the concurrent
|
|
|
+ // iterator doesn't miss a connection
|
|
|
+ for (Connection connection : toArray()) {
|
|
|
+ close(connection);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void startIdleScan() {
|
|
|
+ scheduleIdleScanTask();
|
|
|
+ }
|
|
|
+
|
|
|
+ void stopIdleScan() {
|
|
|
+ idleScanTimer.cancel();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void scheduleIdleScanTask() {
|
|
|
+ if (!running) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ TimerTask idleScanTask = new TimerTask(){
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ if (!running) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(Thread.currentThread().getName()+": task running");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ closeIdle(false);
|
|
|
+ } finally {
|
|
|
+ // explicitly reschedule so next execution occurs relative
|
|
|
+ // to the end of this scan, not the beginning
|
|
|
+ scheduleIdleScanTask();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ idleScanTimer.schedule(idleScanTask, idleScanInterval);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|