|
@@ -34,6 +34,9 @@ import java.io.InputStream;
|
|
import java.util.Hashtable;
|
|
import java.util.Hashtable;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import javax.net.SocketFactory;
|
|
import javax.net.SocketFactory;
|
|
|
|
|
|
@@ -48,7 +51,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
|
|
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
|
|
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
@@ -65,7 +67,7 @@ public class Client {
|
|
|
|
|
|
private Class<?> valueClass; // class of call values
|
|
private Class<?> valueClass; // class of call values
|
|
private int counter; // counter for call ids
|
|
private int counter; // counter for call ids
|
|
- private boolean running = true; // true while client runs
|
|
|
|
|
|
+ private AtomicBoolean running = new AtomicBoolean(true); // if client runs
|
|
final private Configuration conf;
|
|
final private Configuration conf;
|
|
final private int maxIdleTime; //connections will be culled if it was idle for
|
|
final private int maxIdleTime; //connections will be culled if it was idle for
|
|
//maxIdleTime msecs
|
|
//maxIdleTime msecs
|
|
@@ -125,7 +127,7 @@ public class Client {
|
|
synchronized boolean isZeroReference() {
|
|
synchronized boolean isZeroReference() {
|
|
return refCount==0;
|
|
return refCount==0;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/** A call waiting for a value. */
|
|
/** A call waiting for a value. */
|
|
private class Call {
|
|
private class Call {
|
|
int id; // call id
|
|
int id; // call id
|
|
@@ -175,12 +177,13 @@ public class Client {
|
|
private class Connection extends Thread {
|
|
private class Connection extends Thread {
|
|
private ConnectionId remoteId;
|
|
private ConnectionId remoteId;
|
|
private Socket socket = null; // connected socket
|
|
private Socket socket = null; // connected socket
|
|
- private DataInputStream in;
|
|
|
|
- private DataOutputStream out;
|
|
|
|
|
|
+ private DataInputStream in;
|
|
|
|
+ private AtomicReference<DataOutputStream> out =
|
|
|
|
+ new AtomicReference<DataOutputStream>();
|
|
// currently active calls
|
|
// currently active calls
|
|
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
|
|
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
|
|
- private long lastActivity = 0; // last I/O activity time
|
|
|
|
- private boolean shouldCloseConnection = false; // indicate if the connection is closed
|
|
|
|
|
|
+ private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
|
|
|
|
+ private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
|
|
private IOException closeException; // close reason
|
|
private IOException closeException; // close reason
|
|
|
|
|
|
public Connection(InetSocketAddress address) throws IOException {
|
|
public Connection(InetSocketAddress address) throws IOException {
|
|
@@ -201,23 +204,25 @@ public class Client {
|
|
}
|
|
}
|
|
|
|
|
|
/** Update lastActivity with the current time. */
|
|
/** Update lastActivity with the current time. */
|
|
- private synchronized void touch() {
|
|
|
|
- touch(System.currentTimeMillis());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private synchronized void touch(long curTime) {
|
|
|
|
- lastActivity = curTime;
|
|
|
|
|
|
+ private void touch() {
|
|
|
|
+ lastActivity.set(System.currentTimeMillis());
|
|
}
|
|
}
|
|
|
|
|
|
- /** Add a call to this connection's call queue */
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Add a call to this connection's call queue and notify
|
|
|
|
+ * a listener; synchronized.
|
|
|
|
+ * Returns false if called during shutdown.
|
|
|
|
+ * @param call to add
|
|
|
|
+ * @return true if the call was added.
|
|
|
|
+ */
|
|
private synchronized boolean addCall(Call call) {
|
|
private synchronized boolean addCall(Call call) {
|
|
- if (shouldCloseConnection)
|
|
|
|
|
|
+ if (shouldCloseConnection.get())
|
|
return false;
|
|
return false;
|
|
calls.put(call.id, call);
|
|
calls.put(call.id, call);
|
|
notify();
|
|
notify();
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/** This class sends a ping to the remote side when timeout on
|
|
/** This class sends a ping to the remote side when timeout on
|
|
* reading. If no failure is detected, it retries until at least
|
|
* reading. If no failure is detected, it retries until at least
|
|
* a byte is read.
|
|
* a byte is read.
|
|
@@ -233,7 +238,7 @@ public class Client {
|
|
* otherwise, throw the timeout exception.
|
|
* otherwise, throw the timeout exception.
|
|
*/
|
|
*/
|
|
private void handleTimeout(SocketTimeoutException e) throws IOException {
|
|
private void handleTimeout(SocketTimeoutException e) throws IOException {
|
|
- if (shouldCloseConnection || !running) {
|
|
|
|
|
|
+ if (shouldCloseConnection.get() || !running.get()) {
|
|
throw e;
|
|
throw e;
|
|
} else {
|
|
} else {
|
|
sendPing();
|
|
sendPing();
|
|
@@ -243,6 +248,7 @@ public class Client {
|
|
/** Read a byte from the stream.
|
|
/** Read a byte from the stream.
|
|
* Send a ping if timeout on read. Retries if no failure is detected
|
|
* Send a ping if timeout on read. Retries if no failure is detected
|
|
* until a byte is read.
|
|
* until a byte is read.
|
|
|
|
+ * @throws IOException for any IO problem other than socket timeout
|
|
*/
|
|
*/
|
|
public int read() throws IOException {
|
|
public int read() throws IOException {
|
|
do {
|
|
do {
|
|
@@ -258,7 +264,7 @@ public class Client {
|
|
* Send a ping if timeout on read. Retries if no failure is detected
|
|
* Send a ping if timeout on read. Retries if no failure is detected
|
|
* until a byte is read.
|
|
* until a byte is read.
|
|
*
|
|
*
|
|
- * @Return the total number of bytes read; -1 if the connection is closed.
|
|
|
|
|
|
+ * @return the total number of bytes read; -1 if the connection is closed.
|
|
*/
|
|
*/
|
|
public int read(byte[] buf, int off, int len) throws IOException {
|
|
public int read(byte[] buf, int off, int len) throws IOException {
|
|
do {
|
|
do {
|
|
@@ -276,7 +282,7 @@ public class Client {
|
|
* the connection thread that waits for responses.
|
|
* the connection thread that waits for responses.
|
|
*/
|
|
*/
|
|
private synchronized void setupIOstreams() {
|
|
private synchronized void setupIOstreams() {
|
|
- if (socket != null || shouldCloseConnection) {
|
|
|
|
|
|
+ if (socket != null || shouldCloseConnection.get()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -305,19 +311,19 @@ public class Client {
|
|
}
|
|
}
|
|
this.in = new DataInputStream(new BufferedInputStream
|
|
this.in = new DataInputStream(new BufferedInputStream
|
|
(new PingInputStream(NetUtils.getInputStream(socket))));
|
|
(new PingInputStream(NetUtils.getInputStream(socket))));
|
|
- this.out = new DataOutputStream
|
|
|
|
- (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
|
|
|
|
|
|
+ this.out.set(new DataOutputStream
|
|
|
|
+ (new BufferedOutputStream(NetUtils.getOutputStream(socket))));
|
|
writeHeader();
|
|
writeHeader();
|
|
|
|
|
|
// update last activity time
|
|
// update last activity time
|
|
touch();
|
|
touch();
|
|
|
|
|
|
|
|
+ // start the receiver thread after the socket connection has been set up
|
|
|
|
+ start();
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
markClosed(e);
|
|
markClosed(e);
|
|
close();
|
|
close();
|
|
}
|
|
}
|
|
- // start the receiver thread after the socket connection has been set up
|
|
|
|
- start();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/* Handle connection failures
|
|
/* Handle connection failures
|
|
@@ -325,7 +331,10 @@ public class Client {
|
|
* If the current number of retries is equal to the max number of retries,
|
|
* If the current number of retries is equal to the max number of retries,
|
|
* stop retrying and throw the exception; Otherwise backoff 1 second and
|
|
* stop retrying and throw the exception; Otherwise backoff 1 second and
|
|
* try connecting again.
|
|
* try connecting again.
|
|
- *
|
|
|
|
|
|
+ *
|
|
|
|
+ * This Method is only called from inside setupIOstreams(), which is
|
|
|
|
+ * synchronized. Hence the sleep is synchronized; the locks will be retained.
|
|
|
|
+ *
|
|
* @param curRetries current number of retries
|
|
* @param curRetries current number of retries
|
|
* @param maxRetries max number of retries allowed
|
|
* @param maxRetries max number of retries allowed
|
|
* @param ioe failure reason
|
|
* @param ioe failure reason
|
|
@@ -361,6 +370,7 @@ public class Client {
|
|
* Out is not synchronized because only the first thread does this.
|
|
* Out is not synchronized because only the first thread does this.
|
|
*/
|
|
*/
|
|
private void writeHeader() throws IOException {
|
|
private void writeHeader() throws IOException {
|
|
|
|
+ DataOutputStream out = this.out.get();
|
|
out.write(Server.HEADER.array());
|
|
out.write(Server.HEADER.array());
|
|
out.write(Server.CURRENT_VERSION);
|
|
out.write(Server.CURRENT_VERSION);
|
|
//When there are more fields we can have ConnectionHeader Writable.
|
|
//When there are more fields we can have ConnectionHeader Writable.
|
|
@@ -379,8 +389,9 @@ public class Client {
|
|
* Return true if it is time to read a response; false otherwise.
|
|
* Return true if it is time to read a response; false otherwise.
|
|
*/
|
|
*/
|
|
private synchronized boolean waitForWork() {
|
|
private synchronized boolean waitForWork() {
|
|
- if (calls.isEmpty() && !shouldCloseConnection && running) {
|
|
|
|
- long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity);
|
|
|
|
|
|
+ if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
|
|
|
|
+ long timeout = maxIdleTime-
|
|
|
|
+ (System.currentTimeMillis()-lastActivity.get());
|
|
if (timeout>0) {
|
|
if (timeout>0) {
|
|
try {
|
|
try {
|
|
wait(timeout);
|
|
wait(timeout);
|
|
@@ -388,11 +399,11 @@ public class Client {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if (!calls.isEmpty() && !shouldCloseConnection && running) {
|
|
|
|
|
|
+ if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
|
|
return true;
|
|
return true;
|
|
- } else if (shouldCloseConnection) {
|
|
|
|
|
|
+ } else if (shouldCloseConnection.get()) {
|
|
return false;
|
|
return false;
|
|
- } else if (!running) { //get stopped
|
|
|
|
|
|
+ } else if (!running.get()) { //get stopped
|
|
markClosed((IOException)new IOException().initCause(
|
|
markClosed((IOException)new IOException().initCause(
|
|
new InterruptedException()));
|
|
new InterruptedException()));
|
|
return false;
|
|
return false;
|
|
@@ -411,9 +422,10 @@ public class Client {
|
|
*/
|
|
*/
|
|
private synchronized void sendPing() throws IOException {
|
|
private synchronized void sendPing() throws IOException {
|
|
long curTime = System.currentTimeMillis();
|
|
long curTime = System.currentTimeMillis();
|
|
- if ( curTime - lastActivity >= pingInterval) {
|
|
|
|
- touch(curTime);
|
|
|
|
|
|
+ if ( curTime - lastActivity.get() >= pingInterval) {
|
|
|
|
+ lastActivity.set(curTime);
|
|
synchronized (out) {
|
|
synchronized (out) {
|
|
|
|
+ DataOutputStream out = this.out.get();
|
|
out.writeInt(PING_CALL_ID);
|
|
out.writeInt(PING_CALL_ID);
|
|
out.flush();
|
|
out.flush();
|
|
}
|
|
}
|
|
@@ -441,30 +453,36 @@ public class Client {
|
|
* threads.
|
|
* threads.
|
|
*/
|
|
*/
|
|
public void sendParam(Call call) {
|
|
public void sendParam(Call call) {
|
|
- synchronized (this) {
|
|
|
|
- if (shouldCloseConnection) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
|
|
+ if (shouldCloseConnection.get()) {
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ DataOutputBuffer d=null;
|
|
try {
|
|
try {
|
|
- synchronized (out) {
|
|
|
|
|
|
+ synchronized (this.out) {
|
|
|
|
+ DataOutputStream out = this.out.get();
|
|
|
|
+ if (out==null) return; // socket has closed
|
|
|
|
+
|
|
if (LOG.isDebugEnabled())
|
|
if (LOG.isDebugEnabled())
|
|
LOG.debug(getName() + " sending #" + call.id);
|
|
LOG.debug(getName() + " sending #" + call.id);
|
|
-
|
|
|
|
- DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
|
|
|
|
|
|
+
|
|
|
|
+ //for serializing the
|
|
//data to be written
|
|
//data to be written
|
|
|
|
+ d = new DataOutputBuffer();
|
|
d.writeInt(call.id);
|
|
d.writeInt(call.id);
|
|
call.param.write(d);
|
|
call.param.write(d);
|
|
byte[] data = d.getData();
|
|
byte[] data = d.getData();
|
|
int dataLength = d.getLength();
|
|
int dataLength = d.getLength();
|
|
-
|
|
|
|
out.writeInt(dataLength); //first put the data length
|
|
out.writeInt(dataLength); //first put the data length
|
|
out.write(data, 0, dataLength);//write the data
|
|
out.write(data, 0, dataLength);//write the data
|
|
out.flush();
|
|
out.flush();
|
|
}
|
|
}
|
|
} catch(IOException e) {
|
|
} catch(IOException e) {
|
|
markClosed(e);
|
|
markClosed(e);
|
|
|
|
+ } finally {
|
|
|
|
+ //the buffer is just an in-memory buffer, but it is still polite to
|
|
|
|
+ // close early
|
|
|
|
+ IOUtils.closeStream(d);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -472,10 +490,8 @@ public class Client {
|
|
* Because only one receiver, so no synchronization on in.
|
|
* Because only one receiver, so no synchronization on in.
|
|
*/
|
|
*/
|
|
private void receiveResponse() {
|
|
private void receiveResponse() {
|
|
- synchronized (this) {
|
|
|
|
- if (shouldCloseConnection) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
|
|
+ if (shouldCloseConnection.get()) {
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
touch();
|
|
touch();
|
|
|
|
|
|
@@ -502,8 +518,7 @@ public class Client {
|
|
}
|
|
}
|
|
|
|
|
|
private synchronized void markClosed(IOException e) {
|
|
private synchronized void markClosed(IOException e) {
|
|
- if (!shouldCloseConnection) {
|
|
|
|
- shouldCloseConnection = true;
|
|
|
|
|
|
+ if (shouldCloseConnection.compareAndSet(false, true)) {
|
|
closeException = e;
|
|
closeException = e;
|
|
notifyAll();
|
|
notifyAll();
|
|
}
|
|
}
|
|
@@ -511,7 +526,7 @@ public class Client {
|
|
|
|
|
|
/** Close the connection. */
|
|
/** Close the connection. */
|
|
private synchronized void close() {
|
|
private synchronized void close() {
|
|
- if (!shouldCloseConnection) {
|
|
|
|
|
|
+ if (!shouldCloseConnection.get()) {
|
|
LOG.error("The connection is not in the closed state");
|
|
LOG.error("The connection is not in the closed state");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
@@ -527,15 +542,17 @@ public class Client {
|
|
|
|
|
|
// close the socket and streams
|
|
// close the socket and streams
|
|
IOUtils.closeStream(in);
|
|
IOUtils.closeStream(in);
|
|
- IOUtils.closeStream(out);
|
|
|
|
|
|
+ in = null;
|
|
|
|
+ IOUtils.closeStream(out.getAndSet(null));
|
|
IOUtils.closeSocket(socket);
|
|
IOUtils.closeSocket(socket);
|
|
|
|
+ socket = null;
|
|
|
|
|
|
// clean up all calls
|
|
// clean up all calls
|
|
if (closeException == null) {
|
|
if (closeException == null) {
|
|
if (!calls.isEmpty()) {
|
|
if (!calls.isEmpty()) {
|
|
LOG.warn(
|
|
LOG.warn(
|
|
"A connection is closed for no cause and calls are not empty");
|
|
"A connection is closed for no cause and calls are not empty");
|
|
-
|
|
|
|
|
|
+
|
|
// clean up calls anyway
|
|
// clean up calls anyway
|
|
closeException = new IOException("Unexpected closed connection");
|
|
closeException = new IOException("Unexpected closed connection");
|
|
cleanupCalls();
|
|
cleanupCalls();
|
|
@@ -544,7 +561,7 @@ public class Client {
|
|
// log the info
|
|
// log the info
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("closing ipc connection to " + remoteId.address + ": " +
|
|
LOG.debug("closing ipc connection to " + remoteId.address + ": " +
|
|
- StringUtils.stringifyException(closeException));
|
|
|
|
|
|
+ closeException.getMessage(),closeException);
|
|
}
|
|
}
|
|
|
|
|
|
// cleanup calls
|
|
// cleanup calls
|
|
@@ -643,11 +660,10 @@ public class Client {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Stopping client");
|
|
LOG.debug("Stopping client");
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (running == false) {
|
|
|
|
|
|
+
|
|
|
|
+ if (!running.compareAndSet(true, false)) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- running = false;
|
|
|
|
|
|
|
|
// wake up all connections
|
|
// wake up all connections
|
|
synchronized (connections) {
|
|
synchronized (connections) {
|
|
@@ -716,8 +732,9 @@ public class Client {
|
|
Connection connection = getConnection(addresses[i], null, call);
|
|
Connection connection = getConnection(addresses[i], null, call);
|
|
connection.sendParam(call); // send each parameter
|
|
connection.sendParam(call); // send each parameter
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
|
|
+ // log errors
|
|
LOG.info("Calling "+addresses[i]+" caught: " +
|
|
LOG.info("Calling "+addresses[i]+" caught: " +
|
|
- StringUtils.stringifyException(e)); // log errors
|
|
|
|
|
|
+ e.getMessage(),e);
|
|
results.size--; // wait for one fewer result
|
|
results.size--; // wait for one fewer result
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -737,11 +754,9 @@ public class Client {
|
|
UserGroupInformation ticket,
|
|
UserGroupInformation ticket,
|
|
Call call)
|
|
Call call)
|
|
throws IOException {
|
|
throws IOException {
|
|
- synchronized (this) {
|
|
|
|
- if (!running) {
|
|
|
|
- // the client is stopped
|
|
|
|
- throw new IOException("The client is stopped");
|
|
|
|
- }
|
|
|
|
|
|
+ if (!running.get()) {
|
|
|
|
+ // the client is stopped
|
|
|
|
+ throw new IOException("The client is stopped");
|
|
}
|
|
}
|
|
Connection connection;
|
|
Connection connection;
|
|
/* we could avoid this allocation for each RPC by having a
|
|
/* we could avoid this allocation for each RPC by having a
|