|
@@ -24,22 +24,23 @@ import java.net.SocketTimeoutException;
|
|
|
import java.net.UnknownHostException;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.io.EOFException;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.BufferedInputStream;
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.FilterInputStream;
|
|
|
-import java.io.FilterOutputStream;
|
|
|
+import java.io.InputStream;
|
|
|
|
|
|
import java.util.Hashtable;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.Map.Entry;
|
|
|
|
|
|
import javax.net.SocketFactory;
|
|
|
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.ObjectWritable;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
@@ -62,28 +63,65 @@ public class Client {
|
|
|
private Hashtable<ConnectionId, Connection> connections =
|
|
|
new Hashtable<ConnectionId, Connection>();
|
|
|
|
|
|
- private Class valueClass; // class of call values
|
|
|
- private int timeout;// timeout for calls
|
|
|
+ private Class<?> valueClass; // class of call values
|
|
|
private int counter; // counter for call ids
|
|
|
private boolean running = true; // true while client runs
|
|
|
- private Configuration conf;
|
|
|
- private int maxIdleTime; //connections will be culled if it was idle for
|
|
|
+ final private Configuration conf;
|
|
|
+ final private int maxIdleTime; //connections will be culled if it was idle for
|
|
|
//maxIdleTime msecs
|
|
|
final private int maxRetries; //the max. no. of retries for socket connections
|
|
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
|
- private Thread connectionCullerThread;
|
|
|
+ private int pingInterval; // how often sends ping to the server in msecs
|
|
|
+
|
|
|
private SocketFactory socketFactory; // how to create sockets
|
|
|
-
|
|
|
private int refCount = 1;
|
|
|
|
|
|
+ final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
|
|
|
+ final public static int DEFAULT_PING_INTERVAL = 60000; // 1 min
|
|
|
+ final static int PING_CALL_ID = -1;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * set the ping interval value in configuration
|
|
|
+ *
|
|
|
+ * @param conf Configuration
|
|
|
+ * @param pingInterval the ping interval
|
|
|
+ */
|
|
|
+ final public static void setPingInterval(Configuration conf, int pingInterval) {
|
|
|
+ conf.setInt(PING_INTERVAL_NAME, pingInterval);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the ping interval from configuration;
|
|
|
+ * If not set in the configuration, return the default value.
|
|
|
+ *
|
|
|
+ * @param conf Configuration
|
|
|
+ * @return the ping interval
|
|
|
+ */
|
|
|
+ final static int getPingInterval(Configuration conf) {
|
|
|
+ return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Increment this client's reference count
|
|
|
+ *
|
|
|
+ */
|
|
|
synchronized void incCount() {
|
|
|
- refCount++;
|
|
|
+ refCount++;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Decrement this client's reference count
|
|
|
+ *
|
|
|
+ */
|
|
|
synchronized void decCount() {
|
|
|
refCount--;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return if this client has no reference
|
|
|
+ *
|
|
|
+ * @return true if this client has no reference; false otherwise
|
|
|
+ */
|
|
|
synchronized boolean isZeroReference() {
|
|
|
return refCount==0;
|
|
|
}
|
|
@@ -93,9 +131,7 @@ public class Client {
|
|
|
int id; // call id
|
|
|
Writable param; // parameter
|
|
|
Writable value; // value, null if error
|
|
|
- String error; // exception, null if value
|
|
|
- String errorClass; // class of exception
|
|
|
- long lastActivity; // time of last i/o
|
|
|
+ IOException error; // exception, null if value
|
|
|
boolean done; // true when call is done
|
|
|
|
|
|
protected Call(Writable param) {
|
|
@@ -103,30 +139,34 @@ public class Client {
|
|
|
synchronized (Client.this) {
|
|
|
this.id = counter++;
|
|
|
}
|
|
|
- touch();
|
|
|
}
|
|
|
|
|
|
- /** Called by the connection thread when the call is complete and the
|
|
|
- * value or error string are available. Notifies by default. */
|
|
|
- public synchronized void callComplete() {
|
|
|
+ /** Indicate when the call is complete and the
|
|
|
+ * value or error are available. Notifies by default. */
|
|
|
+ protected synchronized void callComplete() {
|
|
|
+ this.done = true;
|
|
|
notify(); // notify caller
|
|
|
}
|
|
|
|
|
|
- /** Update lastActivity with the current time. */
|
|
|
- public synchronized void touch() {
|
|
|
- lastActivity = System.currentTimeMillis();
|
|
|
- }
|
|
|
-
|
|
|
- /** Update lastActivity with the current time. */
|
|
|
- public synchronized void setResult(Writable value,
|
|
|
- String errorClass,
|
|
|
- String error) {
|
|
|
- this.value = value;
|
|
|
+ /** Set the exception when there is an error.
|
|
|
+ * Notify the caller the call is done.
|
|
|
+ *
|
|
|
+ * @param error exception thrown by the call; either local or remote
|
|
|
+ */
|
|
|
+ public synchronized void setException(IOException error) {
|
|
|
this.error = error;
|
|
|
- this.errorClass =errorClass;
|
|
|
- this.done = true;
|
|
|
+ callComplete();
|
|
|
}
|
|
|
|
|
|
+ /** Set the return value when there is no error.
|
|
|
+ * Notify the caller the call is done.
|
|
|
+ *
|
|
|
+ * @param value return value of the call.
|
|
|
+ */
|
|
|
+ public synchronized void setValue(Writable value) {
|
|
|
+ this.value = value;
|
|
|
+ callComplete();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Thread that reads responses and notifies callers. Each connection owns a
|
|
@@ -139,11 +179,9 @@ public class Client {
|
|
|
private DataOutputStream out;
|
|
|
// currently active calls
|
|
|
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
|
|
|
- private Call readingCall;
|
|
|
- private Call writingCall;
|
|
|
- private int inUse = 0;
|
|
|
- private long lastActivity = 0;
|
|
|
- private boolean shouldCloseConnection = false;
|
|
|
+ private long lastActivity = 0; // last I/O activity time
|
|
|
+ private boolean shouldCloseConnection = false; // indicate if the connection is closed
|
|
|
+ private IOException closeException; // close reason
|
|
|
|
|
|
public Connection(InetSocketAddress address) throws IOException {
|
|
|
this(new ConnectionId(address, null));
|
|
@@ -155,59 +193,129 @@ public class Client {
|
|
|
remoteId.getAddress().getHostName());
|
|
|
}
|
|
|
this.remoteId = remoteId;
|
|
|
- this.setName("IPC Client connection to " +
|
|
|
- remoteId.getAddress().toString());
|
|
|
+ UserGroupInformation ticket = remoteId.getTicket();
|
|
|
+ this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
|
|
|
+ remoteId.getAddress().toString() +
|
|
|
+ " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
|
|
|
this.setDaemon(true);
|
|
|
}
|
|
|
|
|
|
- public synchronized void setupIOstreams() throws IOException {
|
|
|
- if (socket != null) {
|
|
|
- notify();
|
|
|
+ /** Update lastActivity with the current time. */
|
|
|
+ private synchronized void touch() {
|
|
|
+ touch(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void touch(long curTime) {
|
|
|
+ lastActivity = curTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Add a call to this connection's call queue */
|
|
|
+ private synchronized boolean addCall(Call call) {
|
|
|
+ if (shouldCloseConnection)
|
|
|
+ return false;
|
|
|
+ calls.put(call.id, call);
|
|
|
+ notify();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** This class sends a ping to the remote side when timeout on
|
|
|
+ * reading. If no failure is detected, it retries until at least
|
|
|
+ * a byte is read.
|
|
|
+ */
|
|
|
+ private class PingInputStream extends FilterInputStream {
|
|
|
+ /* constructor */
|
|
|
+ protected PingInputStream(InputStream in) {
|
|
|
+ super(in);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Process timeout exception
|
|
|
+ * if the connection is not going to be closed, send a ping.
|
|
|
+ * otherwise, throw the timeout exception.
|
|
|
+ */
|
|
|
+ private void handleTimeout(SocketTimeoutException e) throws IOException {
|
|
|
+ if (shouldCloseConnection || !running) {
|
|
|
+ throw e;
|
|
|
+ } else {
|
|
|
+ sendPing();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Read a byte from the stream.
|
|
|
+ * Send a ping if timeout on read. Retries if no failure is detected
|
|
|
+ * until a byte is read.
|
|
|
+ */
|
|
|
+ public int read() throws IOException {
|
|
|
+ do {
|
|
|
+ try {
|
|
|
+ return super.read();
|
|
|
+ } catch (SocketTimeoutException e) {
|
|
|
+ handleTimeout(e);
|
|
|
+ }
|
|
|
+ } while (true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Read bytes into a buffer starting from offset <code>off</code>
|
|
|
+ * Send a ping if timeout on read. Retries if no failure is detected
|
|
|
+ * until a byte is read.
|
|
|
+ *
|
|
|
+ * @Return the total number of bytes read; -1 if the connection is closed.
|
|
|
+ */
|
|
|
+ public int read(byte[] buf, int off, int len) throws IOException {
|
|
|
+ do {
|
|
|
+ try {
|
|
|
+ return super.read(buf, off, len);
|
|
|
+ } catch (SocketTimeoutException e) {
|
|
|
+ handleTimeout(e);
|
|
|
+ }
|
|
|
+ } while (true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Connect to the server and set up the I/O streams. It then sends
|
|
|
+ * a header to the server and starts
|
|
|
+ * the connection thread that waits for responses.
|
|
|
+ */
|
|
|
+ private synchronized void setupIOstreams() {
|
|
|
+ if (socket != null || shouldCloseConnection) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
short ioFailures = 0;
|
|
|
short timeoutFailures = 0;
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- this.socket = socketFactory.createSocket();
|
|
|
- this.socket.setTcpNoDelay(tcpNoDelay);
|
|
|
- // connection time out is 20s
|
|
|
- this.socket.connect(remoteId.getAddress(), 20000);
|
|
|
- break;
|
|
|
- } catch (SocketTimeoutException toe) {
|
|
|
- /* The max number of retries is 45,
|
|
|
- * which amounts to 20s*45 = 15 minutes retries.
|
|
|
- */
|
|
|
- handleConnectionFailure(timeoutFailures++, 45, toe);
|
|
|
- } catch (IOException ie) {
|
|
|
- handleConnectionFailure(ioFailures++, maxRetries, ie);
|
|
|
+ try {
|
|
|
+ LOG.info("Build a connection to "+remoteId.getAddress());
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ this.socket = socketFactory.createSocket();
|
|
|
+ this.socket.setTcpNoDelay(tcpNoDelay);
|
|
|
+ // connection time out is 20s
|
|
|
+ this.socket.connect(remoteId.getAddress(), 20000);
|
|
|
+ this.socket.setSoTimeout(pingInterval);
|
|
|
+ break;
|
|
|
+ } catch (SocketTimeoutException toe) {
|
|
|
+ /* The max number of retries is 45,
|
|
|
+ * which amounts to 20s*45 = 15 minutes retries.
|
|
|
+ */
|
|
|
+ handleConnectionFailure(timeoutFailures++, 45, toe);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ handleConnectionFailure(ioFailures++, maxRetries, ie);
|
|
|
+ }
|
|
|
}
|
|
|
+ this.in = new DataInputStream(new BufferedInputStream
|
|
|
+ (new PingInputStream(NetUtils.getInputStream(socket))));
|
|
|
+ this.out = new DataOutputStream
|
|
|
+ (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
|
|
|
+ writeHeader();
|
|
|
+
|
|
|
+ // update last activity time
|
|
|
+ touch();
|
|
|
+
|
|
|
+ } catch (IOException e) {
|
|
|
+ markClosed(e);
|
|
|
+ close();
|
|
|
}
|
|
|
- socket.setSoTimeout(timeout);
|
|
|
- this.in = new DataInputStream
|
|
|
- (new BufferedInputStream
|
|
|
- (new FilterInputStream(NetUtils.getInputStream(socket)) {
|
|
|
- public int read(byte[] buf, int off, int len) throws IOException {
|
|
|
- int value = super.read(buf, off, len);
|
|
|
- if (readingCall != null) {
|
|
|
- readingCall.touch();
|
|
|
- }
|
|
|
- return value;
|
|
|
- }
|
|
|
- }));
|
|
|
- this.out = new DataOutputStream
|
|
|
- (new BufferedOutputStream
|
|
|
- (new FilterOutputStream(NetUtils.getOutputStream(socket)) {
|
|
|
- public void write(byte[] buf, int o, int len) throws IOException {
|
|
|
- out.write(buf, o, len);
|
|
|
- if (writingCall != null) {
|
|
|
- writingCall.touch();
|
|
|
- }
|
|
|
- }
|
|
|
- }));
|
|
|
- writeHeader();
|
|
|
- notify();
|
|
|
+ // start the receiver thread after the socket connection has been set up
|
|
|
+ start();
|
|
|
}
|
|
|
|
|
|
/* Handle connection failures
|
|
@@ -235,11 +343,6 @@ public class Client {
|
|
|
|
|
|
// throw the exception if the maximum number of retries is reached
|
|
|
if (curRetries == maxRetries) {
|
|
|
- //reset inUse so that the culler gets a chance to throw this
|
|
|
- //connection object out of the table. We don't want to increment
|
|
|
- //inUse to infinity (everytime getConnection is called inUse is
|
|
|
- //incremented)!
|
|
|
- inUse = 0;
|
|
|
throw ioe;
|
|
|
}
|
|
|
|
|
@@ -251,8 +354,11 @@ public class Client {
|
|
|
LOG.info("Retrying connect to server: " + remoteId.getAddress() +
|
|
|
". Already tried " + curRetries + " time(s).");
|
|
|
}
|
|
|
-
|
|
|
- private synchronized void writeHeader() throws IOException {
|
|
|
+
|
|
|
+ /* Write the header for each connection
|
|
|
+ * Out is not synchronized because only the first thread does this.
|
|
|
+ */
|
|
|
+ private void writeHeader() throws IOException {
|
|
|
out.write(Server.HEADER.array());
|
|
|
out.write(Server.CURRENT_VERSION);
|
|
|
//When there are more fields we can have ConnectionHeader Writable.
|
|
@@ -264,155 +370,197 @@ public class Client {
|
|
|
out.write(buf.getData(), 0, bufLen);
|
|
|
}
|
|
|
|
|
|
+ /* wait till someone signals us to start reading RPC response or
|
|
|
+ * it is idle too long, it is marked as to be closed,
|
|
|
+ * or the client is marked as not running.
|
|
|
+ *
|
|
|
+ * Return true if it is time to read a response; false otherwise.
|
|
|
+ */
|
|
|
private synchronized boolean waitForWork() {
|
|
|
- //wait till someone signals us to start reading RPC response or
|
|
|
- //close the connection. If we are idle long enough (blocked in wait),
|
|
|
- //the ConnectionCuller thread will wake us up and ask us to close the
|
|
|
- //connection.
|
|
|
- //We need to wait when inUse is 0 or socket is null (it may be null if
|
|
|
- //the Connection object has been created but the socket connection
|
|
|
- //has not been setup yet). We stop waiting if we have been asked to close
|
|
|
- //connection
|
|
|
- while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
|
|
|
- try {
|
|
|
- wait();
|
|
|
- } catch (InterruptedException e) {}
|
|
|
+ if (calls.isEmpty() && !shouldCloseConnection && running) {
|
|
|
+ long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity);
|
|
|
+ if (timeout>0) {
|
|
|
+ try {
|
|
|
+ wait(timeout);
|
|
|
+ } catch (InterruptedException e) {}
|
|
|
+ }
|
|
|
}
|
|
|
- return !shouldCloseConnection;
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void incrementRef() {
|
|
|
- inUse++;
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void decrementRef() {
|
|
|
- lastActivity = System.currentTimeMillis();
|
|
|
- inUse--;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized boolean isIdle() {
|
|
|
- //check whether the connection is in use or just created
|
|
|
- if (inUse != 0) return false;
|
|
|
- long currTime = System.currentTimeMillis();
|
|
|
- if (currTime - lastActivity > maxIdleTime)
|
|
|
+
|
|
|
+ if (!calls.isEmpty() && !shouldCloseConnection && running) {
|
|
|
return true;
|
|
|
- return false;
|
|
|
+ } else if (shouldCloseConnection) {
|
|
|
+ return false;
|
|
|
+ } else if (!running) { //get stopped
|
|
|
+ markClosed((IOException)new IOException().initCause(
|
|
|
+ new InterruptedException()));
|
|
|
+ return false;
|
|
|
+ } else { // closed because it has been idle for more than maxIdleTime
|
|
|
+ markClosed(null);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public InetSocketAddress getRemoteAddress() {
|
|
|
return remoteId.getAddress();
|
|
|
}
|
|
|
|
|
|
- public void setCloseConnection() {
|
|
|
- shouldCloseConnection = true;
|
|
|
+ /* Send a ping to the server if the time elapsed
|
|
|
+ * since last I/O activity is equal to or greater than the ping interval
|
|
|
+ */
|
|
|
+ private synchronized void sendPing() throws IOException {
|
|
|
+ long curTime = System.currentTimeMillis();
|
|
|
+ if ( curTime - lastActivity >= pingInterval) {
|
|
|
+ touch(curTime);
|
|
|
+ synchronized (out) {
|
|
|
+ out.writeInt(PING_CALL_ID);
|
|
|
+ out.flush();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(getName() + ": starting");
|
|
|
- try {
|
|
|
- while (running) {
|
|
|
- int id;
|
|
|
- //wait here for work - read connection or close connection
|
|
|
- if (waitForWork() == false)
|
|
|
- break;
|
|
|
- try {
|
|
|
- id = in.readInt(); // try to read an id
|
|
|
- } catch (SocketTimeoutException e) {
|
|
|
- continue;
|
|
|
- }
|
|
|
+ LOG.debug(getName() + ": starting, having connections "
|
|
|
+ + connections.size());
|
|
|
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(getName() + " got value #" + id);
|
|
|
-
|
|
|
- Call call = calls.remove(id);
|
|
|
- boolean isError = in.readBoolean(); // read if error
|
|
|
- if (isError) {
|
|
|
- call.setResult(null, WritableUtils.readString(in),
|
|
|
- WritableUtils.readString(in));
|
|
|
- } else {
|
|
|
- Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
|
|
|
- try {
|
|
|
- readingCall = call;
|
|
|
- value.readFields(in); // read value
|
|
|
- } finally {
|
|
|
- readingCall = null;
|
|
|
- }
|
|
|
- call.setResult(value, null, null);
|
|
|
- }
|
|
|
- call.callComplete(); // deliver result to caller
|
|
|
- //received the response. So decrement the ref count
|
|
|
- decrementRef();
|
|
|
- }
|
|
|
- } catch (EOFException eof) {
|
|
|
- // This is what happens when the remote side goes down
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info(StringUtils.stringifyException(e));
|
|
|
- } finally {
|
|
|
- //If there was no exception thrown in this method, then the only
|
|
|
- //way we reached here is by breaking out of the while loop (after
|
|
|
- //waitForWork). And if we took that route to reach here, we have
|
|
|
- //already removed the connection object in the ConnectionCuller thread.
|
|
|
- //We don't want to remove this again as some other thread might have
|
|
|
- //actually put a new Connection object in the table in the meantime.
|
|
|
- synchronized (connections) {
|
|
|
- if (connections.get(remoteId) == this) {
|
|
|
- connections.remove(remoteId);
|
|
|
- }
|
|
|
- }
|
|
|
- close();
|
|
|
+ while (waitForWork()) {//wait here for work - read or close connection
|
|
|
+ receiveResponse();
|
|
|
}
|
|
|
+
|
|
|
+ close();
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug(getName() + ": stopped, remaining connections "
|
|
|
+ + connections.size());
|
|
|
}
|
|
|
|
|
|
/** Initiates a call by sending the parameter to the remote server.
|
|
|
* Note: this is not called from the Connection thread, but by other
|
|
|
* threads.
|
|
|
*/
|
|
|
- public void sendParam(Call call) throws IOException {
|
|
|
- boolean error = true;
|
|
|
+ public void sendParam(Call call) {
|
|
|
+ synchronized (this) {
|
|
|
+ if (shouldCloseConnection) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
try {
|
|
|
- calls.put(call.id, call);
|
|
|
synchronized (out) {
|
|
|
if (LOG.isDebugEnabled())
|
|
|
LOG.debug(getName() + " sending #" + call.id);
|
|
|
- try {
|
|
|
- writingCall = call;
|
|
|
- DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
|
|
|
- //data to be written
|
|
|
- d.writeInt(call.id);
|
|
|
- call.param.write(d);
|
|
|
- byte[] data = d.getData();
|
|
|
- int dataLength = d.getLength();
|
|
|
-
|
|
|
- out.writeInt(dataLength); //first put the data length
|
|
|
- out.write(data, 0, dataLength);//write the data
|
|
|
- out.flush();
|
|
|
- } finally {
|
|
|
- writingCall = null;
|
|
|
- }
|
|
|
- }
|
|
|
- error = false;
|
|
|
- } finally {
|
|
|
- if (error) {
|
|
|
- synchronized (connections) {
|
|
|
- if (connections.get(remoteId) == this)
|
|
|
- connections.remove(remoteId);
|
|
|
- }
|
|
|
- close(); // close on error
|
|
|
+
|
|
|
+ DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
|
|
|
+ //data to be written
|
|
|
+ d.writeInt(call.id);
|
|
|
+ call.param.write(d);
|
|
|
+ byte[] data = d.getData();
|
|
|
+ int dataLength = d.getLength();
|
|
|
+
|
|
|
+ out.writeInt(dataLength); //first put the data length
|
|
|
+ out.write(data, 0, dataLength);//write the data
|
|
|
+ out.flush();
|
|
|
}
|
|
|
+ } catch(IOException e) {
|
|
|
+ markClosed(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Close the connection. */
|
|
|
- public void close() {
|
|
|
- //socket may be null if the connection could not be established to the
|
|
|
- //server in question, and the culler asked us to close the connection
|
|
|
- if (socket == null) return;
|
|
|
+ /* Receive a response.
|
|
|
+ * Because only one receiver, so no synchronization on in.
|
|
|
+ */
|
|
|
+ private void receiveResponse() {
|
|
|
+ synchronized (this) {
|
|
|
+ if (shouldCloseConnection) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ touch();
|
|
|
+
|
|
|
try {
|
|
|
- socket.close(); // close socket
|
|
|
- } catch (IOException e) {}
|
|
|
+ int id = in.readInt(); // try to read an id
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug(getName() + " got value #" + id);
|
|
|
+
|
|
|
+ Call call = calls.remove(id);
|
|
|
+
|
|
|
+ boolean isError = in.readBoolean(); // read if error
|
|
|
+ if (isError) {
|
|
|
+ call.setException(new RemoteException( WritableUtils.readString(in),
|
|
|
+ WritableUtils.readString(in)));
|
|
|
+ } else {
|
|
|
+ Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
|
|
|
+ value.readFields(in); // read value
|
|
|
+ call.setValue(value);
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ markClosed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void markClosed(IOException e) {
|
|
|
+ if (!shouldCloseConnection) {
|
|
|
+ shouldCloseConnection = true;
|
|
|
+ closeException = e;
|
|
|
+ notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Close the connection. */
|
|
|
+ private synchronized void close() {
|
|
|
+ if (!shouldCloseConnection) {
|
|
|
+ LOG.error("The connection is not in the closed state");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized (out) {
|
|
|
+ // release the resources
|
|
|
+ // first thing to do;take the connection out of the connection list
|
|
|
+ synchronized (connections) {
|
|
|
+ if (connections.get(remoteId) == this) {
|
|
|
+ connections.remove(remoteId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // close the socket and streams
|
|
|
+ IOUtils.closeStream(in);
|
|
|
+ IOUtils.closeStream(out);
|
|
|
+ IOUtils.closeSocket(socket);
|
|
|
+
|
|
|
+ // clean up all calls
|
|
|
+ if (closeException == null) {
|
|
|
+ if (!calls.isEmpty()) {
|
|
|
+ LOG.warn(
|
|
|
+ "A connection is closed for no cause and calls are not empty");
|
|
|
+
|
|
|
+ // clean up calls anyway
|
|
|
+ closeException = new IOException("Unexpected closed connection");
|
|
|
+ cleanupCalls();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // log the info
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("closing ipc connection to " + remoteId.address + ": " +
|
|
|
+ StringUtils.stringifyException(closeException));
|
|
|
+ }
|
|
|
+
|
|
|
+ // cleanup calls
|
|
|
+ cleanupCalls();
|
|
|
+ }
|
|
|
+ }
|
|
|
if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(getName() + ": closing");
|
|
|
+ LOG.debug(getName() + ": closed");
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Cleanup all calls and mark them as done */
|
|
|
+ private void cleanupCalls() {
|
|
|
+ Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
|
|
|
+ while (itor.hasNext()) {
|
|
|
+ Call c = itor.next().getValue();
|
|
|
+ c.setException(closeException); // local exception
|
|
|
+ itor.remove();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -428,7 +576,7 @@ public class Client {
|
|
|
}
|
|
|
|
|
|
/** Deliver result to result collector. */
|
|
|
- public void callComplete() {
|
|
|
+ protected void callComplete() {
|
|
|
results.callComplete(this);
|
|
|
}
|
|
|
}
|
|
@@ -453,58 +601,21 @@ public class Client {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private class ConnectionCuller extends Thread {
|
|
|
-
|
|
|
- public static final int MIN_SLEEP_TIME = 1000;
|
|
|
-
|
|
|
- public void run() {
|
|
|
-
|
|
|
- LOG.debug(getName() + ": starting");
|
|
|
-
|
|
|
- while (running) {
|
|
|
- try {
|
|
|
- Thread.sleep(MIN_SLEEP_TIME);
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
-
|
|
|
- synchronized (connections) {
|
|
|
- Iterator i = connections.values().iterator();
|
|
|
- while (i.hasNext()) {
|
|
|
- Connection c = (Connection)i.next();
|
|
|
- if (c.isIdle()) {
|
|
|
- //We don't actually close the socket here (i.e., don't invoke
|
|
|
- //the close() method). We leave that work to the response receiver
|
|
|
- //thread. The reason for that is since we have taken a lock on the
|
|
|
- //connections table object, we don't want to slow down the entire
|
|
|
- //system if we happen to talk to a slow server.
|
|
|
- i.remove();
|
|
|
- synchronized (c) {
|
|
|
- c.setCloseConnection();
|
|
|
- c.notify();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/** Construct an IPC client whose values are of the given {@link Writable}
|
|
|
* class. */
|
|
|
public Client(Class valueClass, Configuration conf,
|
|
|
SocketFactory factory) {
|
|
|
this.valueClass = valueClass;
|
|
|
- this.timeout = conf.getInt("ipc.client.timeout", 10000);
|
|
|
- this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
|
|
|
+ this.maxIdleTime =
|
|
|
+ conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
|
|
|
this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
|
|
|
this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
|
|
|
+ this.pingInterval = getPingInterval(conf);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("The ping interval is" + this.pingInterval + "ms.");
|
|
|
+ }
|
|
|
this.conf = conf;
|
|
|
this.socketFactory = factory;
|
|
|
- this.connectionCullerThread = new ConnectionCuller();
|
|
|
- connectionCullerThread.setDaemon(true);
|
|
|
- connectionCullerThread.setName(valueClass.getName() + " Connection Culler");
|
|
|
- LOG.debug(valueClass.getName() +
|
|
|
- "Connection culler maxidletime= " + maxIdleTime + "ms");
|
|
|
- connectionCullerThread.start();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -535,19 +646,11 @@ public class Client {
|
|
|
return;
|
|
|
}
|
|
|
running = false;
|
|
|
-
|
|
|
- connectionCullerThread.interrupt();
|
|
|
- try {
|
|
|
- connectionCullerThread.join();
|
|
|
- } catch(InterruptedException e) {}
|
|
|
-
|
|
|
- // close and wake up all connections
|
|
|
+
|
|
|
+ // wake up all connections
|
|
|
synchronized (connections) {
|
|
|
for (Connection conn : connections.values()) {
|
|
|
- synchronized (conn) {
|
|
|
- conn.setCloseConnection();
|
|
|
- conn.notifyAll();
|
|
|
- }
|
|
|
+ conn.interrupt();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -560,9 +663,6 @@ public class Client {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Sets the timeout used for network i/o. */
|
|
|
- public void setTimeout(int timeout) { this.timeout = timeout; }
|
|
|
-
|
|
|
/** Make a call, passing <code>param</code>, to the IPC server running at
|
|
|
* <code>address</code>, returning the value. Throws exceptions if there are
|
|
|
* network problems or if the remote code threw an exception. */
|
|
@@ -574,20 +674,24 @@ public class Client {
|
|
|
public Writable call(Writable param, InetSocketAddress addr,
|
|
|
UserGroupInformation ticket)
|
|
|
throws InterruptedException, IOException {
|
|
|
- Connection connection = getConnection(addr, ticket);
|
|
|
Call call = new Call(param);
|
|
|
+ Connection connection = getConnection(addr, ticket, call);
|
|
|
+ connection.sendParam(call); // send the parameter
|
|
|
synchronized (call) {
|
|
|
- connection.sendParam(call); // send the parameter
|
|
|
- long wait = timeout;
|
|
|
- do {
|
|
|
- call.wait(wait); // wait for the result
|
|
|
- wait = timeout - (System.currentTimeMillis() - call.lastActivity);
|
|
|
- } while (!call.done && wait > 0);
|
|
|
+ while (!call.done) {
|
|
|
+ try {
|
|
|
+ call.wait(); // wait for the result
|
|
|
+ } catch (InterruptedException ignored) {}
|
|
|
+ }
|
|
|
|
|
|
if (call.error != null) {
|
|
|
- throw new RemoteException(call.errorClass, call.error);
|
|
|
- } else if (!call.done) {
|
|
|
- throw new SocketTimeoutException("timed out waiting for rpc response");
|
|
|
+ if (call.error instanceof RemoteException) {
|
|
|
+ call.error.fillInStackTrace();
|
|
|
+ throw call.error;
|
|
|
+ } else { // local exception
|
|
|
+ throw (IOException)new IOException(
|
|
|
+ "Call failed on local exception").initCause(call.error);
|
|
|
+ }
|
|
|
} else {
|
|
|
return call.value;
|
|
|
}
|
|
@@ -607,7 +711,7 @@ public class Client {
|
|
|
for (int i = 0; i < params.length; i++) {
|
|
|
ParallelCall call = new ParallelCall(params[i], results, i);
|
|
|
try {
|
|
|
- Connection connection = getConnection(addresses[i], null);
|
|
|
+ Connection connection = getConnection(addresses[i], null, call);
|
|
|
connection.sendParam(call); // send each parameter
|
|
|
} catch (IOException e) {
|
|
|
LOG.info("Calling "+addresses[i]+" caught: " +
|
|
@@ -615,38 +719,44 @@ public class Client {
|
|
|
results.size--; // wait for one fewer result
|
|
|
}
|
|
|
}
|
|
|
- try {
|
|
|
- results.wait(timeout); // wait for all results
|
|
|
- } catch (InterruptedException e) {}
|
|
|
-
|
|
|
- if (results.count == 0) {
|
|
|
- throw new IOException("no responses");
|
|
|
- } else {
|
|
|
- return results.values;
|
|
|
+ while (results.count != results.size) {
|
|
|
+ try {
|
|
|
+ results.wait(); // wait for all results
|
|
|
+ } catch (InterruptedException e) {}
|
|
|
}
|
|
|
+
|
|
|
+ return results.values;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** Get a connection from the pool, or create a new one and add it to the
|
|
|
* pool. Connections to a given host/port are reused. */
|
|
|
private Connection getConnection(InetSocketAddress addr,
|
|
|
- UserGroupInformation ticket)
|
|
|
+ UserGroupInformation ticket,
|
|
|
+ Call call)
|
|
|
throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ if (!running) {
|
|
|
+ // the client is stopped
|
|
|
+ throw new IOException("The client is stopped");
|
|
|
+ }
|
|
|
+ }
|
|
|
Connection connection;
|
|
|
/* we could avoid this allocation for each RPC by having a
|
|
|
* connectionsId object and with set() method. We need to manage the
|
|
|
* refs for keys in HashMap properly. For now its ok.
|
|
|
*/
|
|
|
ConnectionId remoteId = new ConnectionId(addr, ticket);
|
|
|
- synchronized (connections) {
|
|
|
- connection = connections.get(remoteId);
|
|
|
- if (connection == null) {
|
|
|
- connection = new Connection(remoteId);
|
|
|
- connections.put(remoteId, connection);
|
|
|
- connection.start();
|
|
|
+ do {
|
|
|
+ synchronized (connections) {
|
|
|
+ connection = connections.get(remoteId);
|
|
|
+ if (connection == null) {
|
|
|
+ connection = new Connection(remoteId);
|
|
|
+ connections.put(remoteId, connection);
|
|
|
+ }
|
|
|
}
|
|
|
- connection.incrementRef();
|
|
|
- }
|
|
|
+ } while (!connection.addCall(call));
|
|
|
+
|
|
|
//we don't invoke the method below inside "synchronized (connections)"
|
|
|
//block above. The reason for that is if the server happens to be slow,
|
|
|
//it will take longer to establish a connection and that will slow the
|