|
@@ -383,15 +383,16 @@ public class Client {
|
|
|
private Socket socket = null; // connected socket
|
|
|
private DataInputStream in;
|
|
|
private DataOutputStream out;
|
|
|
- private int rpcTimeout;
|
|
|
+ private final int rpcTimeout;
|
|
|
private int maxIdleTime; //connections will be culled if it was idle for
|
|
|
//maxIdleTime msecs
|
|
|
private final RetryPolicy connectionRetryPolicy;
|
|
|
private final int maxRetriesOnSasl;
|
|
|
private int maxRetriesOnSocketTimeouts;
|
|
|
- private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
|
- private boolean doPing; //do we need to send ping message
|
|
|
- private int pingInterval; // how often sends ping to the server in msecs
|
|
|
+ private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
|
+ private final boolean doPing; //do we need to send ping message
|
|
|
+ private final int pingInterval; // how often sends ping to the server
|
|
|
+ private final int soTimeout; // used by ipc ping and rpc timeout
|
|
|
private ByteArrayOutputStream pingRequest; // ping message
|
|
|
|
|
|
// currently active calls
|
|
@@ -429,6 +430,9 @@ public class Client {
|
|
|
pingHeader.writeDelimitedTo(pingRequest);
|
|
|
}
|
|
|
this.pingInterval = remoteId.getPingInterval();
|
|
|
+ this.soTimeout =
|
|
|
+ (rpcTimeout == 0 || (doPing && pingInterval < rpcTimeout))?
|
|
|
+ this.pingInterval : this.rpcTimeout;
|
|
|
this.serviceClass = serviceClass;
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
|
|
@@ -479,12 +483,12 @@ public class Client {
|
|
|
|
|
|
/* Process timeout exception
|
|
|
* if the connection is not going to be closed or
|
|
|
- * is not configured to have a RPC timeout, send a ping.
|
|
|
- * (if rpcTimeout is not set to be 0, then RPC should timeout.
|
|
|
- * otherwise, throw the timeout exception.
|
|
|
+ * the RPC is not timed out yet, send a ping.
|
|
|
*/
|
|
|
- private void handleTimeout(SocketTimeoutException e) throws IOException {
|
|
|
- if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
|
|
|
+ private void handleTimeout(SocketTimeoutException e, int waiting)
|
|
|
+ throws IOException {
|
|
|
+ if (shouldCloseConnection.get() || !running.get() ||
|
|
|
+ (0 < rpcTimeout && rpcTimeout <= waiting)) {
|
|
|
throw e;
|
|
|
} else {
|
|
|
sendPing();
|
|
@@ -498,11 +502,13 @@ public class Client {
|
|
|
*/
|
|
|
@Override
|
|
|
public int read() throws IOException {
|
|
|
+ int waiting = 0;
|
|
|
do {
|
|
|
try {
|
|
|
return super.read();
|
|
|
} catch (SocketTimeoutException e) {
|
|
|
- handleTimeout(e);
|
|
|
+ waiting += soTimeout;
|
|
|
+ handleTimeout(e, waiting);
|
|
|
}
|
|
|
} while (true);
|
|
|
}
|
|
@@ -515,11 +521,13 @@ public class Client {
|
|
|
*/
|
|
|
@Override
|
|
|
public int read(byte[] buf, int off, int len) throws IOException {
|
|
|
+ int waiting = 0;
|
|
|
do {
|
|
|
try {
|
|
|
return super.read(buf, off, len);
|
|
|
} catch (SocketTimeoutException e) {
|
|
|
- handleTimeout(e);
|
|
|
+ waiting += soTimeout;
|
|
|
+ handleTimeout(e, waiting);
|
|
|
}
|
|
|
} while (true);
|
|
|
}
|
|
@@ -612,10 +620,7 @@ public class Client {
|
|
|
}
|
|
|
|
|
|
NetUtils.connect(this.socket, server, connectionTimeout);
|
|
|
- if (rpcTimeout > 0) {
|
|
|
- pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
|
|
|
- }
|
|
|
- this.socket.setSoTimeout(pingInterval);
|
|
|
+ this.socket.setSoTimeout(soTimeout);
|
|
|
return;
|
|
|
} catch (ConnectTimeoutException toe) {
|
|
|
/* Check for an address change and update the local reference.
|