|
@@ -589,6 +589,7 @@ public abstract class Server {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
if (!call.response.hasRemaining()) {
|
|
if (!call.response.hasRemaining()) {
|
|
|
|
+ call.connection.decRpcCount();
|
|
if (numElements == 1) { // last call fully processes.
|
|
if (numElements == 1) { // last call fully processes.
|
|
done = true; // no more data for this channel.
|
|
done = true; // no more data for this channel.
|
|
} else {
|
|
} else {
|
|
@@ -678,6 +679,7 @@ public abstract class Server {
|
|
private ByteBuffer data;
|
|
private ByteBuffer data;
|
|
private ByteBuffer dataLengthBuffer;
|
|
private ByteBuffer dataLengthBuffer;
|
|
private LinkedList<Call> responseQueue;
|
|
private LinkedList<Call> responseQueue;
|
|
|
|
+ private volatile int rpcCount = 0; // number of outstanding rpcs
|
|
private long lastContact;
|
|
private long lastContact;
|
|
private int dataLength;
|
|
private int dataLength;
|
|
private Socket socket;
|
|
private Socket socket;
|
|
@@ -729,8 +731,23 @@ public abstract class Server {
|
|
return lastContact;
|
|
return lastContact;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /* Return true if the connection has no outstanding rpc */
|
|
|
|
+ private boolean isIdle() {
|
|
|
|
+ return rpcCount == 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* Decrement the outstanding RPC count */
|
|
|
|
+ private void decRpcCount() {
|
|
|
|
+ rpcCount--;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* Increment the outstanding RPC count */
|
|
|
|
+ private void incRpcCount() {
|
|
|
|
+ rpcCount++;
|
|
|
|
+ }
|
|
|
|
+
|
|
private boolean timedOut(long currentTime) {
|
|
private boolean timedOut(long currentTime) {
|
|
- if (currentTime - lastContact > maxIdleTime)
|
|
|
|
|
|
+ if (isIdle() && currentTime - lastContact > maxIdleTime)
|
|
return true;
|
|
return true;
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
@@ -779,6 +796,7 @@ public abstract class Server {
|
|
return 0; //ping message
|
|
return 0; //ping message
|
|
}
|
|
}
|
|
data = ByteBuffer.allocate(dataLength);
|
|
data = ByteBuffer.allocate(dataLength);
|
|
|
|
+ incRpcCount(); // Increment the rpc count
|
|
}
|
|
}
|
|
|
|
|
|
count = channel.read(data);
|
|
count = channel.read(data);
|
|
@@ -925,7 +943,7 @@ public abstract class Server {
|
|
this.socketSendBufferSize = 0;
|
|
this.socketSendBufferSize = 0;
|
|
this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
|
|
this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
|
|
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
|
|
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
|
|
- this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
|
|
|
|
|
|
+ this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
|
|
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
|
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
|
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
|
|
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
|
|
|
|
|