|
@@ -45,6 +45,8 @@ import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -146,7 +148,7 @@ public abstract class Server {
|
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
|
|
|
|
volatile private boolean running = true; // true while server runs
|
|
volatile private boolean running = true; // true while server runs
|
|
- private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
|
|
|
|
|
|
+ private BlockingQueue<Call> callQueue; // queued calls
|
|
|
|
|
|
private List<Connection> connectionList =
|
|
private List<Connection> connectionList =
|
|
Collections.synchronizedList(new LinkedList<Connection>());
|
|
Collections.synchronizedList(new LinkedList<Connection>());
|
|
@@ -321,6 +323,11 @@ public abstract class Server {
|
|
closeCurrentConnection(key, e);
|
|
closeCurrentConnection(key, e);
|
|
cleanupConnections(true);
|
|
cleanupConnections(true);
|
|
try { Thread.sleep(60000); } catch (Exception ie) {}
|
|
try { Thread.sleep(60000); } catch (Exception ie) {}
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ if (running) { // unexpected -- log it
|
|
|
|
+ LOG.info(getName() + " caught: " +
|
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
|
+ }
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
closeCurrentConnection(key, e);
|
|
closeCurrentConnection(key, e);
|
|
}
|
|
}
|
|
@@ -363,23 +370,28 @@ public abstract class Server {
|
|
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
|
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
|
Connection c = null;
|
|
Connection c = null;
|
|
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
|
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
|
- SocketChannel channel = server.accept();
|
|
|
|
- channel.configureBlocking(false);
|
|
|
|
- channel.socket().setTcpNoDelay(tcpNoDelay);
|
|
|
|
- SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
|
|
|
|
- c = new Connection(readKey, channel, System.currentTimeMillis());
|
|
|
|
- readKey.attach(c);
|
|
|
|
- synchronized (connectionList) {
|
|
|
|
- connectionList.add(numConnections, c);
|
|
|
|
- numConnections++;
|
|
|
|
|
|
+ // accept up to 10 connections
|
|
|
|
+ for (int i=0; i<10; i++) {
|
|
|
|
+ SocketChannel channel = server.accept();
|
|
|
|
+ if (channel==null) return;
|
|
|
|
+
|
|
|
|
+ channel.configureBlocking(false);
|
|
|
|
+ channel.socket().setTcpNoDelay(tcpNoDelay);
|
|
|
|
+ SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
|
|
|
|
+ c = new Connection(readKey, channel, System.currentTimeMillis());
|
|
|
|
+ readKey.attach(c);
|
|
|
|
+ synchronized (connectionList) {
|
|
|
|
+ connectionList.add(numConnections, c);
|
|
|
|
+ numConnections++;
|
|
|
|
+ }
|
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
|
+ LOG.debug("Server connection from " + c.toString() +
|
|
|
|
+ "; # active connections: " + numConnections +
|
|
|
|
+ "; # queued calls: " + callQueue.size());
|
|
}
|
|
}
|
|
- if (LOG.isDebugEnabled())
|
|
|
|
- LOG.debug("Server connection from " + c.toString() +
|
|
|
|
- "; # active connections: " + numConnections +
|
|
|
|
- "; # queued calls: " + callQueue.size());
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- void doRead(SelectionKey key) {
|
|
|
|
|
|
+ void doRead(SelectionKey key) throws InterruptedException {
|
|
int count = 0;
|
|
int count = 0;
|
|
Connection c = (Connection)key.attachment();
|
|
Connection c = (Connection)key.attachment();
|
|
if (c == null) {
|
|
if (c == null) {
|
|
@@ -389,6 +401,8 @@ public abstract class Server {
|
|
|
|
|
|
try {
|
|
try {
|
|
count = c.readAndProcess();
|
|
count = c.readAndProcess();
|
|
|
|
+ } catch (InterruptedException ieo) {
|
|
|
|
+ throw ieo;
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
|
|
LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
|
|
count = -1; //so that the (count < 0) block is executed
|
|
count = -1; //so that the (count < 0) block is executed
|
|
@@ -822,15 +836,7 @@ public abstract class Server {
|
|
param.readFields(dis);
|
|
param.readFields(dis);
|
|
|
|
|
|
Call call = new Call(id, param, this);
|
|
Call call = new Call(id, param, this);
|
|
- synchronized (callQueue) {
|
|
|
|
- if (callQueue.size() >= maxQueueSize) {
|
|
|
|
- Call oldCall = callQueue.removeFirst();
|
|
|
|
- LOG.warn("Call queue overflow discarding oldest call " + oldCall);
|
|
|
|
- }
|
|
|
|
- callQueue.addLast(call); // queue the call
|
|
|
|
- callQueue.notify(); // wake up a waiting handler
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ callQueue.put(call); // queue the call; maybe blocked here
|
|
}
|
|
}
|
|
|
|
|
|
private synchronized void close() throws IOException {
|
|
private synchronized void close() throws IOException {
|
|
@@ -860,14 +866,7 @@ public abstract class Server {
|
|
ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
|
|
ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
|
|
while (running) {
|
|
while (running) {
|
|
try {
|
|
try {
|
|
- Call call;
|
|
|
|
- synchronized (callQueue) {
|
|
|
|
- while (running && callQueue.size()==0) { // wait for a call
|
|
|
|
- callQueue.wait(timeout);
|
|
|
|
- }
|
|
|
|
- if (!running) break;
|
|
|
|
- call = callQueue.removeFirst(); // pop the queue
|
|
|
|
- }
|
|
|
|
|
|
+ Call call = callQueue.take(); // pop the queue; maybe blocked here
|
|
|
|
|
|
// throw the message away if it is too old
|
|
// throw the message away if it is too old
|
|
if (System.currentTimeMillis() - call.receivedTime >
|
|
if (System.currentTimeMillis() - call.receivedTime >
|
|
@@ -952,6 +951,7 @@ public abstract class Server {
|
|
this.socketSendBufferSize = 0;
|
|
this.socketSendBufferSize = 0;
|
|
maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
|
|
maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
|
|
maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
|
|
maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
|
|
|
|
+ this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
|
|
this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
|
|
this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
|
|
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
|
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
|
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
|
|
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
|