|
@@ -20,17 +20,26 @@ import java.io.IOException;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
-import java.io.BufferedInputStream;
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.StringWriter;
|
|
|
import java.io.PrintWriter;
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.nio.channels.SelectionKey;
|
|
|
+import java.nio.channels.Selector;
|
|
|
+import java.nio.channels.ServerSocketChannel;
|
|
|
+import java.nio.channels.SocketChannel;
|
|
|
+import java.nio.BufferUnderflowException;
|
|
|
+
|
|
|
+import java.net.InetSocketAddress;
|
|
|
import java.net.Socket;
|
|
|
-import java.net.ServerSocket;
|
|
|
-import java.net.SocketException;
|
|
|
-import java.net.SocketTimeoutException;
|
|
|
|
|
|
+import java.util.Collections;
|
|
|
import java.util.LinkedList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.Random;
|
|
|
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
@@ -38,7 +47,8 @@ import org.apache.hadoop.conf.Configurable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
|
-import org.apache.hadoop.io.UTF8;
|
|
|
+
|
|
|
+import org.mortbay.http.nio.SocketChannelOutputStream;
|
|
|
|
|
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
@@ -65,6 +75,16 @@ public abstract class Server {
|
|
|
private int handlerCount; // number of handler threads
|
|
|
private int maxQueuedCalls; // max number of queued calls
|
|
|
private Class paramClass; // class of call parameters
|
|
|
+ private int maxIdleTime; // the maximum idle time after
|
|
|
+ // which a client may be disconnected
|
|
|
+ private int thresholdIdleConnections; // the number of idle connections
|
|
|
+ // after which we will start
|
|
|
+ // cleaning up idle
|
|
|
+ // connections
|
|
|
+ int maxConnectionsToNuke; // the max number of
|
|
|
+ // connections to nuke
|
|
|
+ //during a cleanup
|
|
|
+
|
|
|
private Configuration conf;
|
|
|
|
|
|
private int timeout;
|
|
@@ -73,6 +93,12 @@ public abstract class Server {
|
|
|
private LinkedList callQueue = new LinkedList(); // queued calls
|
|
|
private Object callDequeued = new Object(); // used by wait/notify
|
|
|
|
|
|
+ private List connectionList =
|
|
|
+ Collections.synchronizedList(new LinkedList()); //maintain a list
|
|
|
+ //of client connectionss
|
|
|
+ private Listener listener;
|
|
|
+ private int numConnections = 0;
|
|
|
+
|
|
|
/** A call queued for handling. */
|
|
|
private static class Call {
|
|
|
private int id; // the client's call id
|
|
@@ -86,113 +112,323 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Listens on the socket, starting new connection threads. */
|
|
|
+ /** Listens on the socket. Creates jobs for the handler threads*/
|
|
|
private class Listener extends Thread {
|
|
|
- private ServerSocket socket;
|
|
|
-
|
|
|
+
|
|
|
+ private ServerSocketChannel acceptChannel = null; //the accept channel
|
|
|
+ private Selector selector = null; //the selector that we use for the server
|
|
|
+ private InetSocketAddress address; //the address we bind at
|
|
|
+ private Random rand = new Random();
|
|
|
+ private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
|
|
|
+ //-tion (for idle connections) ran
|
|
|
+ private long cleanupInterval = 10000; //the minimum interval between
|
|
|
+ //two cleanup runs
|
|
|
+
|
|
|
public Listener() throws IOException {
|
|
|
- this.socket = new ServerSocket(port);
|
|
|
- socket.setSoTimeout(timeout);
|
|
|
- this.setDaemon(true);
|
|
|
+ address = new InetSocketAddress(port);
|
|
|
+ // Create a new server socket and set to non blocking mode
|
|
|
+ acceptChannel = ServerSocketChannel.open();
|
|
|
+ acceptChannel.configureBlocking(false);
|
|
|
+
|
|
|
+ // Bind the server socket to the local host and port
|
|
|
+ acceptChannel.socket().bind(address);
|
|
|
+ // create a selector;
|
|
|
+ selector= Selector.open();
|
|
|
+
|
|
|
+ // Register accepts on the server socket with the selector.
|
|
|
+ acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
|
|
|
this.setName("Server listener on port " + port);
|
|
|
+ this.setDaemon(true);
|
|
|
+ }
|
|
|
+ /** cleanup connections from connectionList. Choose a random range
|
|
|
+ * to scan and also have a limit on the number of the connections
|
|
|
+ * that will be cleanedup per run. The criteria for cleanup is the time
|
|
|
+ * for which the connection was idle. If 'force' is true then all
|
|
|
+ * connections will be looked at for the cleanup.
|
|
|
+ */
|
|
|
+ private void cleanupConnections(boolean force) {
|
|
|
+ if (force || numConnections > thresholdIdleConnections) {
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
+ if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int start = 0;
|
|
|
+ int end = numConnections - 1;
|
|
|
+ if (!force) {
|
|
|
+ start = rand.nextInt() % numConnections;
|
|
|
+ end = rand.nextInt() % numConnections;
|
|
|
+ int temp;
|
|
|
+ if (end < start) {
|
|
|
+ temp = start;
|
|
|
+ start = end;
|
|
|
+ end = temp;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int i = start;
|
|
|
+ int numNuked = 0;
|
|
|
+ while (i <= end) {
|
|
|
+ Connection c;
|
|
|
+ synchronized (connectionList) {
|
|
|
+ try {
|
|
|
+ c = (Connection)connectionList.get(i);
|
|
|
+ } catch (Exception e) {return;}
|
|
|
+ }
|
|
|
+ if (c.timedOut(currentTime)) {
|
|
|
+ synchronized (connectionList) {
|
|
|
+ if (connectionList.remove(c))
|
|
|
+ numConnections--;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
+ c.close();
|
|
|
+ } catch (Exception e) {}
|
|
|
+ numNuked++;
|
|
|
+ end--;
|
|
|
+ c = null;
|
|
|
+ if (!force && numNuked == maxConnectionsToNuke) break;
|
|
|
+ }
|
|
|
+ else i++;
|
|
|
+ }
|
|
|
+ lastCleanupRunTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
LOG.info(getName() + ": starting");
|
|
|
+ SERVER.set(Server.this);
|
|
|
while (running) {
|
|
|
- Socket acceptedSock = null;
|
|
|
+ SelectionKey key = null;
|
|
|
try {
|
|
|
- acceptedSock = socket.accept();
|
|
|
- new Connection(acceptedSock).start(); // start a new connection
|
|
|
- } catch (SocketTimeoutException e) { // ignore timeouts
|
|
|
+ selector.select();
|
|
|
+ Iterator iter = selector.selectedKeys().iterator();
|
|
|
+
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ key = (SelectionKey)iter.next();
|
|
|
+ if (key.isAcceptable())
|
|
|
+ doAccept(key);
|
|
|
+ else if (key.isReadable())
|
|
|
+ doRead(key);
|
|
|
+ iter.remove();
|
|
|
+ key = null;
|
|
|
+ }
|
|
|
} catch (OutOfMemoryError e) {
|
|
|
// we can run out of memory if we have too many threads
|
|
|
// log the event and sleep for a minute and give
|
|
|
// some thread(s) a chance to finish
|
|
|
- LOG.warn(getName() + " out of memory, sleeping...", e);
|
|
|
+ closeCurrentConnection(key, e);
|
|
|
+ cleanupConnections(true);
|
|
|
+ try { Thread.sleep(60000); } catch (Exception ie) {}
|
|
|
+ } catch (Exception e) {
|
|
|
+ closeCurrentConnection(key, e);
|
|
|
+ }
|
|
|
+ cleanupConnections(false);
|
|
|
+ }
|
|
|
+ LOG.info("Stopping " + this.getName());
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (acceptChannel != null)
|
|
|
+ acceptChannel.close();
|
|
|
+ if (selector != null)
|
|
|
+ selector.close();
|
|
|
+ } catch (IOException e) { }
|
|
|
+
|
|
|
+ selector= null;
|
|
|
+ acceptChannel= null;
|
|
|
+ connectionList = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void closeCurrentConnection(SelectionKey key, Throwable e) {
|
|
|
+ if (running) {
|
|
|
+ LOG.warn("selector: " + e);
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ if (key != null) {
|
|
|
+ Connection c = (Connection)key.attachment();
|
|
|
+ if (c != null) {
|
|
|
+ synchronized (connectionList) {
|
|
|
+ if (connectionList.remove(c))
|
|
|
+ numConnections--;
|
|
|
+ }
|
|
|
try {
|
|
|
- acceptedSock.close();
|
|
|
- Thread.sleep(60000);
|
|
|
- } catch (InterruptedException ie) { // ignore interrupts
|
|
|
- } catch (IOException ioe) { // ignore IOexceptions
|
|
|
- }
|
|
|
+ LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
+ c.close();
|
|
|
+ } catch (Exception ex) {}
|
|
|
+ c = null;
|
|
|
}
|
|
|
- catch (Exception e) { // log all other exceptions
|
|
|
- LOG.info(getName() + " caught: " + e, e);
|
|
|
- }
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
|
|
+ Connection c = null;
|
|
|
+ ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
|
|
+ SocketChannel channel = server.accept();
|
|
|
+ channel.configureBlocking(false);
|
|
|
+ SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
|
|
|
+ c = new Connection(readKey, channel, System.currentTimeMillis());
|
|
|
+ readKey.attach(c);
|
|
|
+ synchronized (connectionList) {
|
|
|
+ connectionList.add(numConnections, c);
|
|
|
+ numConnections++;
|
|
|
+ }
|
|
|
+ LOG.info("Server connection on port " + port + " from " +
|
|
|
+ c.getHostAddress() +
|
|
|
+ ": starting. Number of active connections: " + numConnections);
|
|
|
+ }
|
|
|
+
|
|
|
+ void doRead(SelectionKey key) {
|
|
|
+ int count = 0;
|
|
|
+ if (!key.isValid() || !key.isReadable())
|
|
|
+ return;
|
|
|
+ Connection c = (Connection)key.attachment();
|
|
|
+ if (c == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ c.setLastContact(System.currentTimeMillis());
|
|
|
+
|
|
|
try {
|
|
|
- socket.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.info(getName() + ": e=" + e);
|
|
|
+ count = c.readAndProcess();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count);
|
|
|
+ e.printStackTrace();
|
|
|
+ count = -1; //so that the (count < 0) block is executed
|
|
|
}
|
|
|
- LOG.info(getName() + ": exiting");
|
|
|
+ if (count < 0) {
|
|
|
+ synchronized (connectionList) {
|
|
|
+ if (connectionList.remove(c))
|
|
|
+ numConnections--;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ LOG.info(getName() + ": disconnecting client " +
|
|
|
+ c.getHostAddress() + ". Number of active connections: "+
|
|
|
+ numConnections);
|
|
|
+ c.close();
|
|
|
+ } catch (Exception e) {}
|
|
|
+ c = null;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ c.setLastContact(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void doStop()
|
|
|
+ {
|
|
|
+ selector.wakeup();
|
|
|
+ Thread.yield();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** Reads calls from a connection and queues them for handling. */
|
|
|
- private class Connection extends Thread {
|
|
|
- private Socket socket;
|
|
|
- private DataInputStream in;
|
|
|
+ private class Connection {
|
|
|
+ private SocketChannel channel;
|
|
|
+ private SelectionKey key;
|
|
|
+ private ByteBuffer data;
|
|
|
+ private ByteBuffer dataLengthBuffer;
|
|
|
private DataOutputStream out;
|
|
|
+ private SocketChannelOutputStream channelOut;
|
|
|
+ private long lastContact;
|
|
|
+ private int dataLength;
|
|
|
+ private Socket socket;
|
|
|
|
|
|
- public Connection(Socket socket) throws IOException {
|
|
|
- this.socket = socket;
|
|
|
- socket.setSoTimeout(timeout);
|
|
|
- this.in = new DataInputStream
|
|
|
- (new BufferedInputStream(socket.getInputStream()));
|
|
|
+ public Connection(SelectionKey key, SocketChannel channel,
|
|
|
+ long lastContact) {
|
|
|
+ this.key = key;
|
|
|
+ this.channel = channel;
|
|
|
+ this.lastContact = lastContact;
|
|
|
+ this.data = null;
|
|
|
+ this.dataLengthBuffer = null;
|
|
|
+ this.socket = channel.socket();
|
|
|
this.out = new DataOutputStream
|
|
|
- (new BufferedOutputStream(socket.getOutputStream()));
|
|
|
- this.setDaemon(true);
|
|
|
- this.setName("Server connection on port " + port + " from "
|
|
|
- + socket.getInetAddress().getHostAddress());
|
|
|
+ (new BufferedOutputStream(
|
|
|
+ this.channelOut = new SocketChannelOutputStream(channel, 4096)));
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getHostAddress() {
|
|
|
+ return socket.getInetAddress().getHostAddress();
|
|
|
}
|
|
|
|
|
|
- public void run() {
|
|
|
- LOG.info(getName() + ": starting");
|
|
|
- SERVER.set(Server.this);
|
|
|
- try {
|
|
|
- while (running) {
|
|
|
- int id;
|
|
|
- try {
|
|
|
- id = in.readInt(); // try to read an id
|
|
|
- } catch (SocketTimeoutException e) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(getName() + " got #" + id);
|
|
|
-
|
|
|
- Writable param = makeParam(); // read param
|
|
|
- param.readFields(in);
|
|
|
+ public void setLastContact(long lastContact) {
|
|
|
+ this.lastContact = lastContact;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getLastContact() {
|
|
|
+ return lastContact;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean timedOut() {
|
|
|
+ if(System.currentTimeMillis() - lastContact > maxIdleTime)
|
|
|
+ return true;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean timedOut(long currentTime) {
|
|
|
+ if(currentTime - lastContact > maxIdleTime)
|
|
|
+ return true;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int readAndProcess() throws IOException, InterruptedException {
|
|
|
+ int count = -1;
|
|
|
+ if (dataLengthBuffer == null)
|
|
|
+ dataLengthBuffer = ByteBuffer.allocateDirect(4);
|
|
|
+ if (dataLengthBuffer.remaining() > 0) {
|
|
|
+ count = channel.read(dataLengthBuffer);
|
|
|
+ if (count < 0) return count;
|
|
|
+ if (dataLengthBuffer.remaining() == 0) {
|
|
|
+ dataLengthBuffer.flip();
|
|
|
+ dataLength = dataLengthBuffer.getInt();
|
|
|
+ data = ByteBuffer.allocateDirect(dataLength);
|
|
|
+ }
|
|
|
+ //return count;
|
|
|
+ }
|
|
|
+ count = channel.read(data);
|
|
|
+ if (data.remaining() == 0) {
|
|
|
+ data.flip();
|
|
|
+ processData();
|
|
|
+ data = dataLengthBuffer = null;
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processData() throws IOException, InterruptedException {
|
|
|
+ byte[] bytes = new byte[dataLength];
|
|
|
+ data.get(bytes);
|
|
|
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
|
|
|
+ int id = dis.readInt(); // try to read an id
|
|
|
|
|
|
- Call call = new Call(id, param, this);
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug(" got #" + id);
|
|
|
+
|
|
|
+ Writable param = makeParam(); // read param
|
|
|
+ param.readFields(dis);
|
|
|
|
|
|
- synchronized (callQueue) {
|
|
|
- callQueue.addLast(call); // queue the call
|
|
|
- callQueue.notify(); // wake up a waiting handler
|
|
|
- }
|
|
|
+ Call call = new Call(id, param, this);
|
|
|
+ synchronized (callQueue) {
|
|
|
+ callQueue.addLast(call); // queue the call
|
|
|
+ callQueue.notify(); // wake up a waiting handler
|
|
|
+ }
|
|
|
|
|
|
- while (running && callQueue.size() >= maxQueuedCalls) {
|
|
|
- synchronized (callDequeued) { // queue is full
|
|
|
- callDequeued.wait(timeout); // wait for a dequeue
|
|
|
- }
|
|
|
- }
|
|
|
+ while (running && callQueue.size() >= maxQueuedCalls) {
|
|
|
+ synchronized (callDequeued) { // queue is full
|
|
|
+ callDequeued.wait(timeout); // wait for a dequeue
|
|
|
}
|
|
|
- } catch (EOFException eof) {
|
|
|
- // This is what happens on linux when the other side shuts down
|
|
|
- } catch (SocketException eof) {
|
|
|
- // This is what happens on Win32 when the other side shuts down
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info(getName() + " caught: " + e, e);
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- socket.close();
|
|
|
- } catch (IOException e) {}
|
|
|
- LOG.info(getName() + ": exiting");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void close() throws IOException {
|
|
|
+ data = null;
|
|
|
+ dataLengthBuffer = null;
|
|
|
+ if (!channel.isOpen())
|
|
|
+ return;
|
|
|
+ try {socket.shutdownOutput();} catch(Exception e) {}
|
|
|
+ try {out.close();} catch(Exception e) {}
|
|
|
+ try {channelOut.destroy();} catch(Exception e) {}
|
|
|
+ if (channel.isOpen()) {
|
|
|
+ try {channel.close();} catch(Exception e) {}
|
|
|
+ }
|
|
|
+ try {socket.close();} catch(Exception e) {}
|
|
|
+ try {key.cancel();} catch(Exception e) {}
|
|
|
+ key = null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Handles queued calls . */
|
|
@@ -237,15 +473,24 @@ public abstract class Server {
|
|
|
|
|
|
DataOutputStream out = call.connection.out;
|
|
|
synchronized (out) {
|
|
|
- out.writeInt(call.id); // write call id
|
|
|
- out.writeBoolean(error!=null); // write error flag
|
|
|
- if (error == null) {
|
|
|
- value.write(out);
|
|
|
- } else {
|
|
|
- WritableUtils.writeString(out, errorClass);
|
|
|
- WritableUtils.writeString(out, error);
|
|
|
+ try {
|
|
|
+ out.writeInt(call.id); // write call id
|
|
|
+ out.writeBoolean(error!=null); // write error flag
|
|
|
+ if (error == null) {
|
|
|
+ value.write(out);
|
|
|
+ } else {
|
|
|
+ WritableUtils.writeString(out, errorClass);
|
|
|
+ WritableUtils.writeString(out, error);
|
|
|
+ }
|
|
|
+ out.flush();
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ synchronized (connectionList) {
|
|
|
+ if (connectionList.remove(call.connection))
|
|
|
+ numConnections--;
|
|
|
+ }
|
|
|
+ call.connection.close();
|
|
|
}
|
|
|
- out.flush();
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -275,7 +520,10 @@ public abstract class Server {
|
|
|
this.paramClass = paramClass;
|
|
|
this.handlerCount = handlerCount;
|
|
|
this.maxQueuedCalls = handlerCount;
|
|
|
- this.timeout = conf.getInt("ipc.client.timeout",10000);
|
|
|
+ this.timeout = conf.getInt("ipc.client.timeout",10000);
|
|
|
+ this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
|
|
|
+ this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
|
|
+ this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
|
|
|
}
|
|
|
|
|
|
/** Sets the timeout used for network i/o. */
|
|
@@ -283,7 +531,7 @@ public abstract class Server {
|
|
|
|
|
|
/** Starts the service. Must be called before any calls will be handled. */
|
|
|
public synchronized void start() throws IOException {
|
|
|
- Listener listener = new Listener();
|
|
|
+ listener = new Listener();
|
|
|
listener.start();
|
|
|
|
|
|
for (int i = 0; i < handlerCount; i++) {
|
|
@@ -298,6 +546,7 @@ public abstract class Server {
|
|
|
public synchronized void stop() {
|
|
|
LOG.info("Stopping server on " + port);
|
|
|
running = false;
|
|
|
+ listener.doStop();
|
|
|
try {
|
|
|
Thread.sleep(timeout); // inexactly wait for pending requests to finish
|
|
|
} catch (InterruptedException e) {}
|