|
@@ -20,23 +20,17 @@ import java.io.IOException;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
+import java.io.BufferedInputStream;
|
|
|
+import java.io.BufferedOutputStream;
|
|
|
import java.io.StringWriter;
|
|
|
import java.io.PrintWriter;
|
|
|
-import java.io.ByteArrayInputStream;
|
|
|
|
|
|
-import java.nio.ByteBuffer;
|
|
|
-import java.nio.channels.SelectionKey;
|
|
|
-import java.nio.channels.Selector;
|
|
|
-import java.nio.channels.ServerSocketChannel;
|
|
|
-import java.nio.channels.SocketChannel;
|
|
|
-import java.nio.BufferUnderflowException;
|
|
|
-
|
|
|
-import java.net.InetSocketAddress;
|
|
|
import java.net.Socket;
|
|
|
+import java.net.ServerSocket;
|
|
|
+import java.net.SocketException;
|
|
|
+import java.net.SocketTimeoutException;
|
|
|
|
|
|
import java.util.LinkedList;
|
|
|
-import java.util.Iterator;
|
|
|
-import java.util.Random;
|
|
|
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
@@ -44,8 +38,7 @@ import org.apache.hadoop.conf.Configurable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
|
-
|
|
|
-import org.mortbay.http.nio.SocketChannelOutputStream;
|
|
|
+import org.apache.hadoop.io.UTF8;
|
|
|
|
|
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
@@ -72,16 +65,6 @@ public abstract class Server {
|
|
|
private int handlerCount; // number of handler threads
|
|
|
private int maxQueuedCalls; // max number of queued calls
|
|
|
private Class paramClass; // class of call parameters
|
|
|
- private int maxIdleTime; // the maximum idle time after
|
|
|
- // which a client may be disconnected
|
|
|
- private int thresholdIdleConnections; // the number of idle connections
|
|
|
- // after which we will start
|
|
|
- // cleaning up idle
|
|
|
- // connections
|
|
|
- int maxConnectionsToNuke; // the max number of
|
|
|
- // connections to nuke
|
|
|
- //during a cleanup
|
|
|
-
|
|
|
private Configuration conf;
|
|
|
|
|
|
private int timeout;
|
|
@@ -90,12 +73,6 @@ public abstract class Server {
|
|
|
private LinkedList callQueue = new LinkedList(); // queued calls
|
|
|
private Object callDequeued = new Object(); // used by wait/notify
|
|
|
|
|
|
- private InetSocketAddress address; //the address we bind at
|
|
|
- private ServerSocketChannel acceptChannel = null; //the (main) accept channel
|
|
|
- private Selector selector = null; //the selector that we use for the server
|
|
|
- private Listener listener;
|
|
|
- private int numConnections = 0;
|
|
|
-
|
|
|
/** A call queued for handling. */
|
|
|
private static class Call {
|
|
|
private int id; // the client's call id
|
|
@@ -109,300 +86,113 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Listens on the socket. Creates jobs for the handler threads*/
|
|
|
+ /** Listens on the socket, starting new connection threads. */
|
|
|
private class Listener extends Thread {
|
|
|
-
|
|
|
- private LinkedList connectionList = new LinkedList(); //maintain a list
|
|
|
- //of client connectionss
|
|
|
- private Random rand = new Random();
|
|
|
- private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
|
|
|
- //-tion (for idle connections) ran
|
|
|
- private int cleanupInterval = 10000; //the minimum interval between
|
|
|
- //two cleanup runs
|
|
|
-
|
|
|
- public Listener() {
|
|
|
- address = new InetSocketAddress(port);
|
|
|
+ private ServerSocket socket;
|
|
|
+
|
|
|
+ public Listener() throws IOException {
|
|
|
+ this.socket = new ServerSocket(port);
|
|
|
+ socket.setSoTimeout(timeout);
|
|
|
this.setDaemon(true);
|
|
|
- }
|
|
|
- /** cleanup connections from connectionList. Choose a random range
|
|
|
- * to scan and also have a limit on the number of the connections
|
|
|
- * that will be cleanedup per run. The criteria for cleanup is the time
|
|
|
- * for which the connection was idle. If 'force' is true then all
|
|
|
- * connections will be looked at for the cleanup.
|
|
|
- */
|
|
|
- private void cleanupConnections(boolean force) {
|
|
|
- if (force || numConnections > thresholdIdleConnections) {
|
|
|
- long currentTime = System.currentTimeMillis();
|
|
|
- if (!force && (int)(currentTime - lastCleanupRunTime) < cleanupInterval) {
|
|
|
- return;
|
|
|
- }
|
|
|
- int start = 0;
|
|
|
- int end = numConnections - 1;
|
|
|
- if (!force) {
|
|
|
- start = rand.nextInt() % numConnections;
|
|
|
- end = rand.nextInt() % numConnections;
|
|
|
- int temp;
|
|
|
- if (end < start) {
|
|
|
- temp = start;
|
|
|
- start = end;
|
|
|
- end = temp;
|
|
|
- }
|
|
|
- }
|
|
|
- int i = start;
|
|
|
- int numNuked = 0;
|
|
|
- while (i <= end) {
|
|
|
- Connection c = (Connection)connectionList.get(i);
|
|
|
- if (c.timedOut(currentTime)) {
|
|
|
- connectionList.remove(i);
|
|
|
- try {
|
|
|
- LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
- c.close();
|
|
|
- } catch (Exception e) {}
|
|
|
- numNuked++;
|
|
|
- end--;
|
|
|
- if (!force && numNuked == maxConnectionsToNuke) break;
|
|
|
- }
|
|
|
- else i++;
|
|
|
- }
|
|
|
- lastCleanupRunTime = System.currentTimeMillis();
|
|
|
- }
|
|
|
+ this.setName("Server listener on port " + port);
|
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
- SERVER.set(Server.this);
|
|
|
-
|
|
|
- try {
|
|
|
- // Create a new server socket and set to non blocking mode
|
|
|
- acceptChannel = ServerSocketChannel.open();
|
|
|
- acceptChannel.configureBlocking(false);
|
|
|
-
|
|
|
- // Bind the server socket to the local host and port
|
|
|
- acceptChannel.socket().bind(address);
|
|
|
-
|
|
|
- // create a selector;
|
|
|
- selector= Selector.open();
|
|
|
-
|
|
|
- // Register accepts on the server socket with the selector.
|
|
|
- acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
|
|
|
- this.setName("Server listener on port " + port);
|
|
|
-
|
|
|
- LOG.info(getName() + ": starting");
|
|
|
-
|
|
|
- while (running) {
|
|
|
- SelectionKey key = null;
|
|
|
+ LOG.info(getName() + ": starting");
|
|
|
+ while (running) {
|
|
|
+ Socket acceptedSock = null;
|
|
|
+ try {
|
|
|
+ acceptedSock = socket.accept();
|
|
|
+ new Connection(acceptedSock).start(); // start a new connection
|
|
|
+ } catch (SocketTimeoutException e) { // ignore timeouts
|
|
|
+ } catch (OutOfMemoryError e) {
|
|
|
+ // we can run out of memory if we have too many threads
|
|
|
+ // log the event and sleep for a minute and give
|
|
|
+ // some thread(s) a chance to finish
|
|
|
+ LOG.warn(getName() + " out of memory, sleeping...", e);
|
|
|
try {
|
|
|
- selector.select(timeout);
|
|
|
- Iterator iter = selector.selectedKeys().iterator();
|
|
|
-
|
|
|
- while (iter.hasNext()) {
|
|
|
- key = (SelectionKey)iter.next();
|
|
|
- if (key.isAcceptable())
|
|
|
- doAccept(key);
|
|
|
- else if (key.isReadable())
|
|
|
- doRead(key);
|
|
|
- iter.remove();
|
|
|
- key = null;
|
|
|
- }
|
|
|
- } catch (OutOfMemoryError e) {
|
|
|
- closeCurrentConnection(key, e);
|
|
|
- cleanupConnections(true);
|
|
|
+ acceptedSock.close();
|
|
|
Thread.sleep(60000);
|
|
|
- } catch (Exception e) {
|
|
|
- closeCurrentConnection(key, e);
|
|
|
- }
|
|
|
- cleanupConnections(false);
|
|
|
+ } catch (InterruptedException ie) { // ignore interrupts
|
|
|
+ } catch (IOException ioe) { // ignore IOexceptions
|
|
|
+ }
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.fatal("selector",e);
|
|
|
+ catch (Exception e) { // log all other exceptions
|
|
|
+ LOG.info(getName() + " caught: " + e, e);
|
|
|
+ }
|
|
|
}
|
|
|
- LOG.info("Stopping " + this.getName());
|
|
|
-
|
|
|
try {
|
|
|
- if (acceptChannel != null)
|
|
|
- acceptChannel.close();
|
|
|
- if (selector != null)
|
|
|
- selector.close();
|
|
|
- } catch (IOException e) { }
|
|
|
-
|
|
|
- selector= null;
|
|
|
- acceptChannel= null;
|
|
|
- connectionList = null;
|
|
|
- }
|
|
|
-
|
|
|
- private void closeCurrentConnection(SelectionKey key, Throwable e) {
|
|
|
- if (running) {
|
|
|
- LOG.warn("selector: " + e);
|
|
|
- e.printStackTrace();
|
|
|
+ socket.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info(getName() + ": e=" + e);
|
|
|
}
|
|
|
- if (key != null) {
|
|
|
- Connection c = (Connection)key.attachment();
|
|
|
- if (c != null) {
|
|
|
- connectionList.remove(c);
|
|
|
- try {
|
|
|
- LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
- c.close();
|
|
|
- } catch (Exception ex) {}
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
|
|
- Connection c = null;
|
|
|
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
|
|
- SocketChannel channel = server.accept();
|
|
|
- channel.configureBlocking(false);
|
|
|
- SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
|
|
|
- c = new Connection(readKey, channel, System.currentTimeMillis());
|
|
|
- readKey.attach(c);
|
|
|
- connectionList.addLast(c);
|
|
|
- numConnections++;
|
|
|
- LOG.info("Server connection on port " + port + " from " +
|
|
|
- c.getHostAddress() + ": starting");
|
|
|
- }
|
|
|
-
|
|
|
- void doRead(SelectionKey key) {
|
|
|
- int count = 0;
|
|
|
- if (!key.isValid() || !key.isReadable())
|
|
|
- return;
|
|
|
- Connection c = (Connection)key.attachment();
|
|
|
- if (c == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- c.setLastContact(System.currentTimeMillis());
|
|
|
-
|
|
|
- try {
|
|
|
- count = c.readAndProcess();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count);
|
|
|
- count = -1; //so that the (count < 0) block is executed
|
|
|
- }
|
|
|
- if (count < 0) {
|
|
|
- connectionList.remove(c);
|
|
|
- try {
|
|
|
- LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
|
|
|
- c.close();
|
|
|
- } catch (Exception e) {}
|
|
|
- }
|
|
|
- else {
|
|
|
- c.setLastContact(System.currentTimeMillis());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void doStop()
|
|
|
- {
|
|
|
- selector.wakeup();
|
|
|
- Thread.yield();
|
|
|
+ LOG.info(getName() + ": exiting");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** Reads calls from a connection and queues them for handling. */
|
|
|
- private class Connection {
|
|
|
- private SocketChannel channel;
|
|
|
- private SelectionKey key;
|
|
|
- private ByteBuffer data;
|
|
|
- private ByteBuffer dataLengthBuffer;
|
|
|
- private DataOutputStream out;
|
|
|
- private long lastContact;
|
|
|
- private int dataLength;
|
|
|
+ private class Connection extends Thread {
|
|
|
private Socket socket;
|
|
|
+ private DataInputStream in;
|
|
|
+ private DataOutputStream out;
|
|
|
|
|
|
- public Connection(SelectionKey key, SocketChannel channel,
|
|
|
- long lastContact) {
|
|
|
- this.key = key;
|
|
|
- this.channel = channel;
|
|
|
- this.lastContact = lastContact;
|
|
|
- this.data = null;
|
|
|
- this.dataLengthBuffer = null;
|
|
|
- this.socket = channel.socket();
|
|
|
+ public Connection(Socket socket) throws IOException {
|
|
|
+ this.socket = socket;
|
|
|
+ socket.setSoTimeout(timeout);
|
|
|
+ this.in = new DataInputStream
|
|
|
+ (new BufferedInputStream(socket.getInputStream()));
|
|
|
this.out = new DataOutputStream
|
|
|
- (new SocketChannelOutputStream(channel, 4096));
|
|
|
- }
|
|
|
-
|
|
|
- public String getHostAddress() {
|
|
|
- return socket.getInetAddress().getHostAddress();
|
|
|
- }
|
|
|
-
|
|
|
- public void setLastContact(long lastContact) {
|
|
|
- this.lastContact = lastContact;
|
|
|
- }
|
|
|
-
|
|
|
- public long getLastContact() {
|
|
|
- return lastContact;
|
|
|
- }
|
|
|
-
|
|
|
- private boolean timedOut() {
|
|
|
- if(System.currentTimeMillis() - lastContact > maxIdleTime)
|
|
|
- return true;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- private boolean timedOut(long currentTime) {
|
|
|
- if(currentTime - lastContact > timeout)
|
|
|
- return true;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- public int readAndProcess() throws IOException, InterruptedException {
|
|
|
- int count = -1;
|
|
|
- if (dataLengthBuffer == null)
|
|
|
- dataLengthBuffer = ByteBuffer.allocateDirect(4);
|
|
|
- if (dataLengthBuffer.remaining() > 0) {
|
|
|
- count = channel.read(dataLengthBuffer);
|
|
|
- if (count < 0) return count;
|
|
|
- if (dataLengthBuffer.remaining() == 0) {
|
|
|
- dataLengthBuffer.flip();
|
|
|
- dataLength = dataLengthBuffer.getInt();
|
|
|
- data = ByteBuffer.allocateDirect(dataLength);
|
|
|
- }
|
|
|
- return count;
|
|
|
- }
|
|
|
- count = channel.read(data);
|
|
|
- if (data.remaining() == 0) {
|
|
|
- data.flip();
|
|
|
- processData();
|
|
|
- data = dataLengthBuffer = null;
|
|
|
- }
|
|
|
- return count;
|
|
|
+ (new BufferedOutputStream(socket.getOutputStream()));
|
|
|
+ this.setDaemon(true);
|
|
|
+ this.setName("Server connection on port " + port + " from "
|
|
|
+ + socket.getInetAddress().getHostAddress());
|
|
|
}
|
|
|
|
|
|
- private void processData() throws IOException, InterruptedException {
|
|
|
- byte[] bytes = new byte[dataLength];
|
|
|
- data.get(bytes);
|
|
|
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
|
|
|
- int id = dis.readInt(); // try to read an id
|
|
|
+ public void run() {
|
|
|
+ LOG.info(getName() + ": starting");
|
|
|
+ SERVER.set(Server.this);
|
|
|
+ try {
|
|
|
+ while (running) {
|
|
|
+ int id;
|
|
|
+ try {
|
|
|
+ id = in.readInt(); // try to read an id
|
|
|
+ } catch (SocketTimeoutException e) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(" got #" + id);
|
|
|
-
|
|
|
- Writable param = makeParam(); // read param
|
|
|
- param.readFields(dis);
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug(getName() + " got #" + id);
|
|
|
|
|
|
- Call call = new Call(id, param, this);
|
|
|
- synchronized (callQueue) {
|
|
|
- callQueue.addLast(call); // queue the call
|
|
|
- callQueue.notify(); // wake up a waiting handler
|
|
|
- }
|
|
|
+ Writable param = makeParam(); // read param
|
|
|
+ param.readFields(in);
|
|
|
+
|
|
|
+ Call call = new Call(id, param, this);
|
|
|
+
|
|
|
+ synchronized (callQueue) {
|
|
|
+ callQueue.addLast(call); // queue the call
|
|
|
+ callQueue.notify(); // wake up a waiting handler
|
|
|
+ }
|
|
|
|
|
|
- while (running && callQueue.size() >= maxQueuedCalls) {
|
|
|
- synchronized (callDequeued) { // queue is full
|
|
|
- callDequeued.wait(timeout); // wait for a dequeue
|
|
|
+ while (running && callQueue.size() >= maxQueuedCalls) {
|
|
|
+ synchronized (callDequeued) { // queue is full
|
|
|
+ callDequeued.wait(timeout); // wait for a dequeue
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ } catch (EOFException eof) {
|
|
|
+ // This is what happens on linux when the other side shuts down
|
|
|
+ } catch (SocketException eof) {
|
|
|
+ // This is what happens on Win32 when the other side shuts down
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info(getName() + " caught: " + e, e);
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ socket.close();
|
|
|
+ } catch (IOException e) {}
|
|
|
+ LOG.info(getName() + ": exiting");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void close() throws IOException {
|
|
|
- data = null;
|
|
|
- dataLengthBuffer = null;
|
|
|
- if (!channel.isOpen())
|
|
|
- return;
|
|
|
- socket.shutdownOutput();
|
|
|
- channel.close();
|
|
|
- socket.close();
|
|
|
- channel.close();
|
|
|
- out.close();
|
|
|
- key.cancel();
|
|
|
- numConnections--;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/** Handles queued calls . */
|
|
@@ -455,6 +245,7 @@ public abstract class Server {
|
|
|
WritableUtils.writeString(out, errorClass);
|
|
|
WritableUtils.writeString(out, error);
|
|
|
}
|
|
|
+ out.flush();
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -484,10 +275,7 @@ public abstract class Server {
|
|
|
this.paramClass = paramClass;
|
|
|
this.handlerCount = handlerCount;
|
|
|
this.maxQueuedCalls = handlerCount;
|
|
|
- this.timeout = conf.getInt("ipc.client.timeout",10000);
|
|
|
- this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
|
|
|
- this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
|
|
- this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 120000);
|
|
|
+ this.timeout = conf.getInt("ipc.client.timeout",10000);
|
|
|
}
|
|
|
|
|
|
/** Sets the timeout used for network i/o. */
|
|
@@ -495,7 +283,7 @@ public abstract class Server {
|
|
|
|
|
|
/** Starts the service. Must be called before any calls will be handled. */
|
|
|
public synchronized void start() throws IOException {
|
|
|
- listener = new Listener();
|
|
|
+ Listener listener = new Listener();
|
|
|
listener.start();
|
|
|
|
|
|
for (int i = 0; i < handlerCount; i++) {
|
|
@@ -510,7 +298,6 @@ public abstract class Server {
|
|
|
public synchronized void stop() {
|
|
|
LOG.info("Stopping server on " + port);
|
|
|
running = false;
|
|
|
- listener.doStop();
|
|
|
try {
|
|
|
Thread.sleep(timeout); // inexactly wait for pending requests to finish
|
|
|
} catch (InterruptedException e) {}
|