|
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.SelectionKey;
|
|
@@ -44,12 +45,13 @@ import java.util.List;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Random;
|
|
|
|
|
|
-import org.apache.commons.logging.*;
|
|
|
-
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.ObjectWritable;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.ipc.SocketChannelOutputStream;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.*;
|
|
@@ -145,6 +147,7 @@ public abstract class Server {
|
|
|
private int timeout;
|
|
|
private long maxCallStartAge;
|
|
|
private int maxQueueSize;
|
|
|
+ private int socketSendBufferSize;
|
|
|
|
|
|
volatile private boolean running = true; // true while server runs
|
|
|
private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
|
|
@@ -154,6 +157,7 @@ public abstract class Server {
|
|
|
//maintain a list
|
|
|
//of client connections
|
|
|
private Listener listener = null;
|
|
|
+ private Responder responder = null;
|
|
|
private int numConnections = 0;
|
|
|
private Handler[] handlers = null;
|
|
|
|
|
@@ -191,17 +195,24 @@ public abstract class Server {
|
|
|
private Writable param; // the parameter passed
|
|
|
private Connection connection; // connection to client
|
|
|
private long receivedTime; // the time received
|
|
|
+ private ByteBuffer response; // the response for this call
|
|
|
|
|
|
public Call(int id, Writable param, Connection connection) {
|
|
|
this.id = id;
|
|
|
this.param = param;
|
|
|
this.connection = connection;
|
|
|
this.receivedTime = System.currentTimeMillis();
|
|
|
+ this.response = null;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public String toString() {
|
|
|
return param.toString() + " from " + connection.toString();
|
|
|
}
|
|
|
+
|
|
|
+ public void setResponse(ByteBuffer response) {
|
|
|
+ this.response = response;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Listens on the socket. Creates jobs for the handler threads*/
|
|
@@ -288,6 +299,7 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public void run() {
|
|
|
LOG.info(getName() + ": starting");
|
|
|
SERVER.set(Server.this);
|
|
@@ -428,6 +440,234 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Sends responses of RPC back to clients.
|
|
|
+ private class Responder extends Thread {
|
|
|
+ private Selector writeSelector;
|
|
|
+ private boolean pending; // call waiting to be enqueued
|
|
|
+
|
|
|
+ Responder() throws IOException {
|
|
|
+ this.setName("IPC Server Responder");
|
|
|
+ this.setDaemon(true);
|
|
|
+ writeSelector = Selector.open(); // create a selector
|
|
|
+ pending = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ LOG.info(getName() + ": starting");
|
|
|
+ SERVER.set(Server.this);
|
|
|
+ long lastPurgeTime = 0; // last check for old calls.
|
|
|
+
|
|
|
+ while (running) {
|
|
|
+ SelectionKey key = null;
|
|
|
+ try {
|
|
|
+ waitPending(); // If a channel is being registered, wait.
|
|
|
+ writeSelector.select(maxCallStartAge);
|
|
|
+ Iterator iter = writeSelector.selectedKeys().iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ key = (SelectionKey)iter.next();
|
|
|
+ iter.remove();
|
|
|
+ try {
|
|
|
+ if (key.isValid() && key.isWritable()) {
|
|
|
+ doAsyncWrite(key);
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info(getName() + ": doAsyncWrite threw exception " + e);
|
|
|
+ key.cancel();
|
|
|
+ }
|
|
|
+ key = null;
|
|
|
+ }
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ if (now < lastPurgeTime + maxCallStartAge) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ lastPurgeTime = now;
|
|
|
+ //
|
|
|
+ // If there were some calls that have not been sent out for a
|
|
|
+ // long time, discard them.
|
|
|
+ //
|
|
|
+ LOG.debug("Checking for old call responses.");
|
|
|
+ iter = writeSelector.keys().iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ key = (SelectionKey)iter.next();
|
|
|
+ try {
|
|
|
+ doPurge(key, now);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Error in purging old calls " + e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (OutOfMemoryError e) {
|
|
|
+ //
|
|
|
+ // we can run out of memory if we have too many threads
|
|
|
+ // log the event and sleep for a minute and give
|
|
|
+ // some thread(s) a chance to finish
|
|
|
+ //
|
|
|
+ LOG.warn("Out of Memory in server select", e);
|
|
|
+ try { Thread.sleep(60000); } catch (Exception ie) {}
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Exception in Responder " + e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Stopping " + this.getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void doAsyncWrite(SelectionKey key) throws IOException {
|
|
|
+ Call call = (Call)key.attachment();
|
|
|
+ if (call == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (key.channel() != call.connection.channel) {
|
|
|
+ throw new IOException("doAsyncWrite: bad channel");
|
|
|
+ }
|
|
|
+ if (processResponse(call.connection.responseQueue)) {
|
|
|
+ key.cancel(); // remove item from selector.
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Remove calls that have been pending in the responseQueue
|
|
|
+ // for a long time.
|
|
|
+ //
|
|
|
+ private void doPurge(SelectionKey key, long now) throws IOException {
|
|
|
+ Call call = (Call)key.attachment();
|
|
|
+ if (call == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (key.channel() != call.connection.channel) {
|
|
|
+ LOG.info("doPurge: bad channel");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ LinkedList<Call> responseQueue = call.connection.responseQueue;
|
|
|
+ synchronized (responseQueue) {
|
|
|
+ Iterator iter = responseQueue.listIterator(0);
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ call = (Call)iter.next();
|
|
|
+ if (now > call.receivedTime + maxCallStartAge) {
|
|
|
+ LOG.info(getName() + ", call " + call +
|
|
|
+ ": response discarded for being too old (" +
|
|
|
+ (now - call.receivedTime) + ")");
|
|
|
+ iter.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // If all the calls for this channel were removed, then
|
|
|
+ // remove this channel from the selector
|
|
|
+ if (responseQueue.size() == 0) {
|
|
|
+ key.cancel();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Processes one response. Returns true if there are no more pending
|
|
|
+ // data for this channel.
|
|
|
+ //
|
|
|
+ private boolean processResponse(LinkedList<Call> responseQueue) throws IOException {
|
|
|
+ boolean error = true;
|
|
|
+ boolean done = false; // there is more data for this channel.
|
|
|
+ int numElements = 0;
|
|
|
+ Call call = null;
|
|
|
+ try {
|
|
|
+ synchronized (responseQueue) {
|
|
|
+ //
|
|
|
+ // If there are no items for this channel, then we are done
|
|
|
+ //
|
|
|
+ numElements = responseQueue.size();
|
|
|
+ if (numElements == 0) {
|
|
|
+ error = false;
|
|
|
+ return true; // no more data for this channel.
|
|
|
+ }
|
|
|
+ //
|
|
|
+ // Extract the first call
|
|
|
+ //
|
|
|
+ int numBytes = 0;
|
|
|
+ call = responseQueue.removeFirst();
|
|
|
+ SocketChannel channel = call.connection.channel;
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
|
|
+ call.connection);
|
|
|
+ }
|
|
|
+ //
|
|
|
+ // Send as much data as we can in the non-blocking fashion
|
|
|
+ //
|
|
|
+ numBytes = channel.write(call.response);
|
|
|
+ if (!call.response.hasRemaining()) {
|
|
|
+ if (numElements == 1) { // last call fully processes.
|
|
|
+ done = true; // no more data for this channel.
|
|
|
+ } else {
|
|
|
+ done = false; // more calls pending to be sent.
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
|
|
+ call.connection + " Wrote " + numBytes + " bytes.");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //
|
|
|
+ // If we were unable to write the entire response out, then
|
|
|
+ // insert in Selector queue.
|
|
|
+ //
|
|
|
+ call.connection.responseQueue.addFirst(call);
|
|
|
+ setPending();
|
|
|
+ try {
|
|
|
+ // Wakeup the thread blocked on select, only then can the call
|
|
|
+ // to channel.register() complete.
|
|
|
+ writeSelector.wakeup();
|
|
|
+ SelectionKey readKey = channel.register(writeSelector,
|
|
|
+ SelectionKey.OP_WRITE);
|
|
|
+ readKey.attach(call);
|
|
|
+ } finally {
|
|
|
+ clearPending();
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
|
|
+ call.connection + " Wrote partial " + numBytes +
|
|
|
+ " bytes.");
|
|
|
+ }
|
|
|
+ done = false; // this call not fully processed.
|
|
|
+ }
|
|
|
+ error = false; // everything went off well
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (error && call != null) {
|
|
|
+ LOG.warn(getName()+", call " + call + ": output error");
|
|
|
+ done = true; // error. no more data for this channel.
|
|
|
+ synchronized (connectionList) {
|
|
|
+ if (connectionList.remove(call.connection))
|
|
|
+ numConnections--;
|
|
|
+ }
|
|
|
+ call.connection.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return done;
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Enqueue a response from the application.
|
|
|
+ //
|
|
|
+ void doRespond(Call call) throws IOException {
|
|
|
+ synchronized (call.connection.responseQueue) {
|
|
|
+ call.connection.responseQueue.addLast(call);
|
|
|
+ if (call.connection.responseQueue.size() == 1) {
|
|
|
+ processResponse(call.connection.responseQueue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void setPending() { // call waiting to be enqueued.
|
|
|
+ pending = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void clearPending() { // call done enqueueing.
|
|
|
+ pending = false;
|
|
|
+ notify();
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void waitPending() throws InterruptedException {
|
|
|
+ while (pending) {
|
|
|
+ wait();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Reads calls from a connection and queues them for handling. */
|
|
|
private class Connection {
|
|
|
private boolean versionRead = false; //if initial signature and
|
|
@@ -438,8 +678,7 @@ public abstract class Server {
|
|
|
private SelectionKey key;
|
|
|
private ByteBuffer data;
|
|
|
private ByteBuffer dataLengthBuffer;
|
|
|
- private DataOutputStream out;
|
|
|
- private SocketChannelOutputStream channelOut;
|
|
|
+ private LinkedList<Call> responseQueue;
|
|
|
private long lastContact;
|
|
|
private int dataLength;
|
|
|
private Socket socket;
|
|
@@ -457,9 +696,6 @@ public abstract class Server {
|
|
|
this.data = null;
|
|
|
this.dataLengthBuffer = ByteBuffer.allocate(4);
|
|
|
this.socket = channel.socket();
|
|
|
- this.out = new DataOutputStream
|
|
|
- (new BufferedOutputStream(
|
|
|
- this.channelOut = new SocketChannelOutputStream(channel)));
|
|
|
InetAddress addr = socket.getInetAddress();
|
|
|
if (addr == null) {
|
|
|
this.hostAddress = "*Unknown*";
|
|
@@ -467,8 +703,18 @@ public abstract class Server {
|
|
|
this.hostAddress = addr.getHostAddress();
|
|
|
}
|
|
|
this.remotePort = socket.getPort();
|
|
|
+ this.responseQueue = new LinkedList<Call>();
|
|
|
+ if (socketSendBufferSize != 0) {
|
|
|
+ try {
|
|
|
+ socket.setSendBufferSize(socketSendBufferSize);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Connection: unable to set socket send buffer size to " +
|
|
|
+ socketSendBufferSize);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public String toString() {
|
|
|
return getHostAddress() + ":" + remotePort;
|
|
|
}
|
|
@@ -516,7 +762,9 @@ public abstract class Server {
|
|
|
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
|
|
|
//Warning is ok since this is not supposed to happen.
|
|
|
LOG.warn("Incorrect header or version mismatch from " +
|
|
|
- hostAddress + ":" + remotePort);
|
|
|
+ hostAddress + ":" + remotePort +
|
|
|
+ " got version " + version +
|
|
|
+ " expected version " + CURRENT_VERSION);
|
|
|
return -1;
|
|
|
}
|
|
|
dataLengthBuffer.clear();
|
|
@@ -589,8 +837,6 @@ public abstract class Server {
|
|
|
if (!channel.isOpen())
|
|
|
return;
|
|
|
try {socket.shutdownOutput();} catch(Exception e) {}
|
|
|
- try {out.close();} catch(Exception e) {}
|
|
|
- try {channelOut.destroy();} catch(Exception e) {}
|
|
|
if (channel.isOpen()) {
|
|
|
try {channel.close();} catch(Exception e) {}
|
|
|
}
|
|
@@ -607,9 +853,11 @@ public abstract class Server {
|
|
|
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public void run() {
|
|
|
LOG.info(getName() + ": starting");
|
|
|
SERVER.set(Server.this);
|
|
|
+ ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
|
|
|
while (running) {
|
|
|
try {
|
|
|
Call call;
|
|
@@ -648,28 +896,20 @@ public abstract class Server {
|
|
|
error = StringUtils.stringifyException(e);
|
|
|
}
|
|
|
CurCall.set(null);
|
|
|
-
|
|
|
- DataOutputStream out = call.connection.out;
|
|
|
- synchronized (out) {
|
|
|
- try {
|
|
|
- out.writeInt(call.id); // write call id
|
|
|
- out.writeBoolean(error!=null); // write error flag
|
|
|
- if (error == null) {
|
|
|
- value.write(out);
|
|
|
- } else {
|
|
|
- WritableUtils.writeString(out, errorClass);
|
|
|
- WritableUtils.writeString(out, error);
|
|
|
- }
|
|
|
- out.flush();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn(getName()+", call "+call+": output error", e);
|
|
|
- synchronized (connectionList) {
|
|
|
- if (connectionList.remove(call.connection))
|
|
|
- numConnections--;
|
|
|
- }
|
|
|
- call.connection.close();
|
|
|
- }
|
|
|
+
|
|
|
+ buf.reset();
|
|
|
+ DataOutputStream out = new DataOutputStream(buf);
|
|
|
+ out.writeInt(call.id); // write call id
|
|
|
+ out.writeBoolean(error != null); // write error flag
|
|
|
+
|
|
|
+ if (error == null) {
|
|
|
+ value.write(out);
|
|
|
+ } else {
|
|
|
+ WritableUtils.writeString(out, errorClass);
|
|
|
+ WritableUtils.writeString(out, error);
|
|
|
}
|
|
|
+ call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
|
|
|
+ responder.doRespond(call);
|
|
|
} catch (InterruptedException e) {
|
|
|
if (running) { // unexpected -- log it
|
|
|
LOG.info(getName() + " caught: " + e, e);
|
|
@@ -695,6 +935,7 @@ public abstract class Server {
|
|
|
this.paramClass = paramClass;
|
|
|
this.handlerCount = handlerCount;
|
|
|
this.timeout = conf.getInt("ipc.client.timeout", 10000);
|
|
|
+ this.socketSendBufferSize = 0;
|
|
|
maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
|
|
|
maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
|
|
|
this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
|
|
@@ -704,13 +945,20 @@ public abstract class Server {
|
|
|
// Start the listener here and let it bind to the port
|
|
|
listener = new Listener();
|
|
|
this.port = listener.getAddress().getPort();
|
|
|
+
|
|
|
+ // Create the responder here
|
|
|
+ responder = new Responder();
|
|
|
}
|
|
|
|
|
|
/** Sets the timeout used for network i/o. */
|
|
|
public void setTimeout(int timeout) { this.timeout = timeout; }
|
|
|
|
|
|
+ /** Sets the socket buffer size used for responding to RPCs */
|
|
|
+ public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
|
|
|
+
|
|
|
/** Starts the service. Must be called before any calls will be handled. */
|
|
|
public synchronized void start() throws IOException {
|
|
|
+ responder.start();
|
|
|
listener.start();
|
|
|
handlers = new Handler[handlerCount];
|
|
|
|
|
@@ -733,6 +981,7 @@ public abstract class Server {
|
|
|
}
|
|
|
listener.interrupt();
|
|
|
listener.doStop();
|
|
|
+ responder.interrupt();
|
|
|
notifyAll();
|
|
|
}
|
|
|
|