|
@@ -25,6 +25,8 @@ import java.io.ByteArrayInputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
|
|
+import java.nio.channels.CancelledKeyException;
|
|
|
|
+import java.nio.channels.ClosedChannelException;
|
|
import java.nio.channels.SelectionKey;
|
|
import java.nio.channels.SelectionKey;
|
|
import java.nio.channels.Selector;
|
|
import java.nio.channels.Selector;
|
|
import java.nio.channels.ServerSocketChannel;
|
|
import java.nio.channels.ServerSocketChannel;
|
|
@@ -293,15 +295,9 @@ public abstract class Server {
|
|
} catch (Exception e) {return;}
|
|
} catch (Exception e) {return;}
|
|
}
|
|
}
|
|
if (c.timedOut(currentTime)) {
|
|
if (c.timedOut(currentTime)) {
|
|
- synchronized (connectionList) {
|
|
|
|
- if (connectionList.remove(c))
|
|
|
|
- numConnections--;
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
|
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
|
- c.close();
|
|
|
|
- } catch (Exception e) {}
|
|
|
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
|
+ LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
|
+ closeConnection(c);
|
|
numNuked++;
|
|
numNuked++;
|
|
end--;
|
|
end--;
|
|
c = null;
|
|
c = null;
|
|
@@ -334,7 +330,6 @@ public abstract class Server {
|
|
doRead(key);
|
|
doRead(key);
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- key.cancel();
|
|
|
|
}
|
|
}
|
|
key = null;
|
|
key = null;
|
|
}
|
|
}
|
|
@@ -369,15 +364,9 @@ public abstract class Server {
|
|
if (key != null) {
|
|
if (key != null) {
|
|
Connection c = (Connection)key.attachment();
|
|
Connection c = (Connection)key.attachment();
|
|
if (c != null) {
|
|
if (c != null) {
|
|
- synchronized (connectionList) {
|
|
|
|
- if (connectionList.remove(c))
|
|
|
|
- numConnections--;
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
|
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
|
- c.close();
|
|
|
|
- } catch (Exception ex) {}
|
|
|
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
|
+ LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
|
+ closeConnection(c);
|
|
c = null;
|
|
c = null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -417,22 +406,15 @@ public abstract class Server {
|
|
try {
|
|
try {
|
|
count = c.readAndProcess();
|
|
count = c.readAndProcess();
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- key.cancel();
|
|
|
|
LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
|
|
LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
|
|
count = -1; //so that the (count < 0) block is executed
|
|
count = -1; //so that the (count < 0) block is executed
|
|
}
|
|
}
|
|
if (count < 0) {
|
|
if (count < 0) {
|
|
- synchronized (connectionList) {
|
|
|
|
- if (connectionList.remove(c))
|
|
|
|
- numConnections--;
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
|
- LOG.debug(getName() + ": disconnecting client " +
|
|
|
|
- c.getHostAddress() + ". Number of active connections: "+
|
|
|
|
- numConnections);
|
|
|
|
- c.close();
|
|
|
|
- } catch (Exception e) {}
|
|
|
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
|
+ LOG.debug(getName() + ": disconnecting client " +
|
|
|
|
+ c.getHostAddress() + ". Number of active connections: "+
|
|
|
|
+ numConnections);
|
|
|
|
+ closeConnection(c);
|
|
c = null;
|
|
c = null;
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
@@ -458,13 +440,13 @@ public abstract class Server {
|
|
// Sends responses of RPC back to clients.
|
|
// Sends responses of RPC back to clients.
|
|
private class Responder extends Thread {
|
|
private class Responder extends Thread {
|
|
private Selector writeSelector;
|
|
private Selector writeSelector;
|
|
- private boolean pending; // call waiting to be enqueued
|
|
|
|
|
|
+ private int pending; // connections waiting to register
|
|
|
|
|
|
Responder() throws IOException {
|
|
Responder() throws IOException {
|
|
this.setName("IPC Server Responder");
|
|
this.setName("IPC Server Responder");
|
|
this.setDaemon(true);
|
|
this.setDaemon(true);
|
|
writeSelector = Selector.open(); // create a selector
|
|
writeSelector = Selector.open(); // create a selector
|
|
- pending = false;
|
|
|
|
|
|
+ pending = 0;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -474,13 +456,12 @@ public abstract class Server {
|
|
long lastPurgeTime = 0; // last check for old calls.
|
|
long lastPurgeTime = 0; // last check for old calls.
|
|
|
|
|
|
while (running) {
|
|
while (running) {
|
|
- SelectionKey key = null;
|
|
|
|
try {
|
|
try {
|
|
waitPending(); // If a channel is being registered, wait.
|
|
waitPending(); // If a channel is being registered, wait.
|
|
writeSelector.select(maxCallStartAge);
|
|
writeSelector.select(maxCallStartAge);
|
|
- Iterator iter = writeSelector.selectedKeys().iterator();
|
|
|
|
|
|
+ Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
|
|
while (iter.hasNext()) {
|
|
while (iter.hasNext()) {
|
|
- key = (SelectionKey)iter.next();
|
|
|
|
|
|
+ SelectionKey key = iter.next();
|
|
iter.remove();
|
|
iter.remove();
|
|
try {
|
|
try {
|
|
if (key.isValid() && key.isWritable()) {
|
|
if (key.isValid() && key.isWritable()) {
|
|
@@ -488,9 +469,7 @@ public abstract class Server {
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.info(getName() + ": doAsyncWrite threw exception " + e);
|
|
LOG.info(getName() + ": doAsyncWrite threw exception " + e);
|
|
- key.cancel();
|
|
|
|
}
|
|
}
|
|
- key = null;
|
|
|
|
}
|
|
}
|
|
long now = System.currentTimeMillis();
|
|
long now = System.currentTimeMillis();
|
|
if (now < lastPurgeTime + maxCallStartAge) {
|
|
if (now < lastPurgeTime + maxCallStartAge) {
|
|
@@ -504,7 +483,7 @@ public abstract class Server {
|
|
LOG.debug("Checking for old call responses.");
|
|
LOG.debug("Checking for old call responses.");
|
|
iter = writeSelector.keys().iterator();
|
|
iter = writeSelector.keys().iterator();
|
|
while (iter.hasNext()) {
|
|
while (iter.hasNext()) {
|
|
- key = (SelectionKey)iter.next();
|
|
|
|
|
|
+ SelectionKey key = iter.next();
|
|
try {
|
|
try {
|
|
doPurge(key, now);
|
|
doPurge(key, now);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -535,8 +514,20 @@ public abstract class Server {
|
|
if (key.channel() != call.connection.channel) {
|
|
if (key.channel() != call.connection.channel) {
|
|
throw new IOException("doAsyncWrite: bad channel");
|
|
throw new IOException("doAsyncWrite: bad channel");
|
|
}
|
|
}
|
|
- if (processResponse(call.connection.responseQueue)) {
|
|
|
|
- key.cancel(); // remove item from selector.
|
|
|
|
|
|
+
|
|
|
|
+ synchronized(call.connection.responseQueue) {
|
|
|
|
+ if (processResponse(call.connection.responseQueue, false)) {
|
|
|
|
+ try {
|
|
|
|
+ key.interestOps(0);
|
|
|
|
+ } catch (CancelledKeyException e) {
|
|
|
|
+ /* The Listener/reader might have closed the socket.
|
|
|
|
+ * We don't explicitly cancel the key, so not sure if this will
|
|
|
|
+ * ever fire.
|
|
|
|
+ * This warning could be removed.
|
|
|
|
+ */
|
|
|
|
+ LOG.warn("Exception while changing ops : " + e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -553,11 +544,22 @@ public abstract class Server {
|
|
LOG.info("doPurge: bad channel");
|
|
LOG.info("doPurge: bad channel");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
+ boolean close = false;
|
|
LinkedList<Call> responseQueue = call.connection.responseQueue;
|
|
LinkedList<Call> responseQueue = call.connection.responseQueue;
|
|
synchronized (responseQueue) {
|
|
synchronized (responseQueue) {
|
|
- Iterator iter = responseQueue.listIterator(0);
|
|
|
|
|
|
+ Iterator<Call> iter = responseQueue.listIterator(0);
|
|
while (iter.hasNext()) {
|
|
while (iter.hasNext()) {
|
|
- call = (Call)iter.next();
|
|
|
|
|
|
+ call = iter.next();
|
|
|
|
+ if (call.response.position() > 0) {
|
|
|
|
+ /* We should probably use a different a different start time
|
|
|
|
+ * than receivedTime. receivedTime starts when the RPC
|
|
|
|
+ * was first read.
|
|
|
|
+ * We have written a partial response. will close the
|
|
|
|
+ * connection for now.
|
|
|
|
+ */
|
|
|
|
+ close = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
if (now > call.receivedTime + maxCallStartAge) {
|
|
if (now > call.receivedTime + maxCallStartAge) {
|
|
LOG.info(getName() + ", call " + call +
|
|
LOG.info(getName() + ", call " + call +
|
|
": response discarded for being too old (" +
|
|
": response discarded for being too old (" +
|
|
@@ -565,19 +567,18 @@ public abstract class Server {
|
|
iter.remove();
|
|
iter.remove();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- // If all the calls for this channel were removed, then
|
|
|
|
- // remove this channel from the selector
|
|
|
|
- if (responseQueue.size() == 0) {
|
|
|
|
- key.cancel();
|
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (close) {
|
|
|
|
+ closeConnection(call.connection);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// Processes one response. Returns true if there are no more pending
|
|
// Processes one response. Returns true if there are no more pending
|
|
// data for this channel.
|
|
// data for this channel.
|
|
//
|
|
//
|
|
- private boolean processResponse(LinkedList<Call> responseQueue) throws IOException {
|
|
|
|
|
|
+ private boolean processResponse(LinkedList<Call> responseQueue,
|
|
|
|
+ boolean inHandler) throws IOException {
|
|
boolean error = true;
|
|
boolean error = true;
|
|
boolean done = false; // there is more data for this channel.
|
|
boolean done = false; // there is more data for this channel.
|
|
int numElements = 0;
|
|
int numElements = 0;
|
|
@@ -595,7 +596,6 @@ public abstract class Server {
|
|
//
|
|
//
|
|
// Extract the first call
|
|
// Extract the first call
|
|
//
|
|
//
|
|
- int numBytes = 0;
|
|
|
|
call = responseQueue.removeFirst();
|
|
call = responseQueue.removeFirst();
|
|
SocketChannel channel = call.connection.channel;
|
|
SocketChannel channel = call.connection.channel;
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -605,7 +605,10 @@ public abstract class Server {
|
|
//
|
|
//
|
|
// Send as much data as we can in the non-blocking fashion
|
|
// Send as much data as we can in the non-blocking fashion
|
|
//
|
|
//
|
|
- numBytes = channel.write(call.response);
|
|
|
|
|
|
+ int numBytes = channel.write(call.response);
|
|
|
|
+ if (numBytes < 0) {
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
if (!call.response.hasRemaining()) {
|
|
if (!call.response.hasRemaining()) {
|
|
if (numElements == 1) { // last call fully processes.
|
|
if (numElements == 1) { // last call fully processes.
|
|
done = true; // no more data for this channel.
|
|
done = true; // no more data for this channel.
|
|
@@ -621,24 +624,27 @@ public abstract class Server {
|
|
// If we were unable to write the entire response out, then
|
|
// If we were unable to write the entire response out, then
|
|
// insert in Selector queue.
|
|
// insert in Selector queue.
|
|
//
|
|
//
|
|
- call.connection.responseQueue.addFirst(call);
|
|
|
|
- setPending();
|
|
|
|
- try {
|
|
|
|
- // Wakeup the thread blocked on select, only then can the call
|
|
|
|
- // to channel.register() complete.
|
|
|
|
- writeSelector.wakeup();
|
|
|
|
- SelectionKey readKey = channel.register(writeSelector,
|
|
|
|
- SelectionKey.OP_WRITE);
|
|
|
|
- readKey.attach(call);
|
|
|
|
- } finally {
|
|
|
|
- clearPending();
|
|
|
|
|
|
+ call.connection.responseQueue.addFirst(call);
|
|
|
|
+
|
|
|
|
+ if (inHandler) {
|
|
|
|
+ incPending();
|
|
|
|
+ try {
|
|
|
|
+ // Wakeup the thread blocked on select, only then can the call
|
|
|
|
+ // to channel.register() complete.
|
|
|
|
+ writeSelector.wakeup();
|
|
|
|
+ channel.register(writeSelector, SelectionKey.OP_WRITE, call);
|
|
|
|
+ } catch (ClosedChannelException e) {
|
|
|
|
+ //Its ok. channel might be closed else where.
|
|
|
|
+ done = true;
|
|
|
|
+ } finally {
|
|
|
|
+ decPending();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
|
LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
|
call.connection + " Wrote partial " + numBytes +
|
|
call.connection + " Wrote partial " + numBytes +
|
|
" bytes.");
|
|
" bytes.");
|
|
}
|
|
}
|
|
- done = false; // this call not fully processed.
|
|
|
|
}
|
|
}
|
|
error = false; // everything went off well
|
|
error = false; // everything went off well
|
|
}
|
|
}
|
|
@@ -646,11 +652,7 @@ public abstract class Server {
|
|
if (error && call != null) {
|
|
if (error && call != null) {
|
|
LOG.warn(getName()+", call " + call + ": output error");
|
|
LOG.warn(getName()+", call " + call + ": output error");
|
|
done = true; // error. no more data for this channel.
|
|
done = true; // error. no more data for this channel.
|
|
- synchronized (connectionList) {
|
|
|
|
- if (connectionList.remove(call.connection))
|
|
|
|
- numConnections--;
|
|
|
|
- }
|
|
|
|
- call.connection.close();
|
|
|
|
|
|
+ closeConnection(call.connection);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return done;
|
|
return done;
|
|
@@ -663,22 +665,22 @@ public abstract class Server {
|
|
synchronized (call.connection.responseQueue) {
|
|
synchronized (call.connection.responseQueue) {
|
|
call.connection.responseQueue.addLast(call);
|
|
call.connection.responseQueue.addLast(call);
|
|
if (call.connection.responseQueue.size() == 1) {
|
|
if (call.connection.responseQueue.size() == 1) {
|
|
- processResponse(call.connection.responseQueue);
|
|
|
|
|
|
+ processResponse(call.connection.responseQueue, true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void setPending() { // call waiting to be enqueued.
|
|
|
|
- pending = true;
|
|
|
|
|
|
+ private synchronized void incPending() { // call waiting to be enqueued.
|
|
|
|
+ pending++;
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void clearPending() { // call done enqueueing.
|
|
|
|
- pending = false;
|
|
|
|
|
|
+ private synchronized void decPending() { // call done enqueueing.
|
|
|
|
+ pending--;
|
|
notify();
|
|
notify();
|
|
}
|
|
}
|
|
|
|
|
|
private synchronized void waitPending() throws InterruptedException {
|
|
private synchronized void waitPending() throws InterruptedException {
|
|
- while (pending) {
|
|
|
|
|
|
+ while (pending > 0) {
|
|
wait();
|
|
wait();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -691,7 +693,6 @@ public abstract class Server {
|
|
private boolean headerRead = false; //if the connection header that
|
|
private boolean headerRead = false; //if the connection header that
|
|
//follows version is read.
|
|
//follows version is read.
|
|
private SocketChannel channel;
|
|
private SocketChannel channel;
|
|
- private SelectionKey key;
|
|
|
|
private ByteBuffer data;
|
|
private ByteBuffer data;
|
|
private ByteBuffer dataLengthBuffer;
|
|
private ByteBuffer dataLengthBuffer;
|
|
private LinkedList<Call> responseQueue;
|
|
private LinkedList<Call> responseQueue;
|
|
@@ -706,7 +707,6 @@ public abstract class Server {
|
|
|
|
|
|
public Connection(SelectionKey key, SocketChannel channel,
|
|
public Connection(SelectionKey key, SocketChannel channel,
|
|
long lastContact) {
|
|
long lastContact) {
|
|
- this.key = key;
|
|
|
|
this.channel = channel;
|
|
this.channel = channel;
|
|
this.lastContact = lastContact;
|
|
this.lastContact = lastContact;
|
|
this.data = null;
|
|
this.data = null;
|
|
@@ -847,7 +847,7 @@ public abstract class Server {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- private void close() throws IOException {
|
|
|
|
|
|
+ private synchronized void close() throws IOException {
|
|
data = null;
|
|
data = null;
|
|
dataLengthBuffer = null;
|
|
dataLengthBuffer = null;
|
|
if (!channel.isOpen())
|
|
if (!channel.isOpen())
|
|
@@ -857,8 +857,6 @@ public abstract class Server {
|
|
try {channel.close();} catch(Exception e) {}
|
|
try {channel.close();} catch(Exception e) {}
|
|
}
|
|
}
|
|
try {socket.close();} catch(Exception e) {}
|
|
try {socket.close();} catch(Exception e) {}
|
|
- try {key.cancel();} catch(Exception e) {}
|
|
|
|
- key = null;
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -980,6 +978,17 @@ public abstract class Server {
|
|
responder = new Responder();
|
|
responder = new Responder();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void closeConnection(Connection connection) {
|
|
|
|
+ synchronized (connectionList) {
|
|
|
|
+ if (connectionList.remove(connection))
|
|
|
|
+ numConnections--;
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ connection.close();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/** Sets the timeout used for network i/o. */
|
|
/** Sets the timeout used for network i/o. */
|
|
public void setTimeout(int timeout) { this.timeout = timeout; }
|
|
public void setTimeout(int timeout) { this.timeout = timeout; }
|
|
|
|
|