|
@@ -18,40 +18,41 @@
|
|
|
|
|
|
package org.apache.hadoop.ipc;
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
-import java.net.InetAddress;
|
|
|
|
-import java.net.Socket;
|
|
|
|
-import java.net.InetSocketAddress;
|
|
|
|
-import java.net.SocketTimeoutException;
|
|
|
|
-import java.net.UnknownHostException;
|
|
|
|
-import java.net.ConnectException;
|
|
|
|
-
|
|
|
|
-import java.io.IOException;
|
|
|
|
-import java.io.DataInputStream;
|
|
|
|
-import java.io.DataOutputStream;
|
|
|
|
import java.io.BufferedInputStream;
|
|
import java.io.BufferedInputStream;
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.BufferedOutputStream;
|
|
|
|
+import java.io.DataInputStream;
|
|
|
|
+import java.io.DataOutputStream;
|
|
import java.io.FilterInputStream;
|
|
import java.io.FilterInputStream;
|
|
|
|
+import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
-
|
|
|
|
|
|
+import java.net.ConnectException;
|
|
|
|
+import java.net.InetAddress;
|
|
|
|
+import java.net.InetSocketAddress;
|
|
|
|
+import java.net.Socket;
|
|
|
|
+import java.net.SocketTimeoutException;
|
|
|
|
+import java.net.UnknownHostException;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.util.Hashtable;
|
|
import java.util.Hashtable;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
+import java.util.Map.Entry;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
-import java.util.Map.Entry;
|
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import javax.net.SocketFactory;
|
|
import javax.net.SocketFactory;
|
|
|
|
|
|
-import org.apache.commons.logging.*;
|
|
|
|
-
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
-import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicy;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.KerberosInfo;
|
|
import org.apache.hadoop.security.KerberosInfo;
|
|
import org.apache.hadoop.security.SaslRpcClient;
|
|
import org.apache.hadoop.security.SaslRpcClient;
|
|
@@ -60,8 +61,8 @@ import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
-import org.apache.hadoop.security.token.TokenSelector;
|
|
|
|
import org.apache.hadoop.security.token.TokenInfo;
|
|
import org.apache.hadoop.security.token.TokenInfo;
|
|
|
|
+import org.apache.hadoop.security.token.TokenSelector;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
|
|
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
|
|
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
|
|
@@ -71,7 +72,9 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|
* @see Server
|
|
* @see Server
|
|
*/
|
|
*/
|
|
public class Client {
|
|
public class Client {
|
|
-
|
|
|
|
|
|
+ public static final String IPC_CLIENT_CONNECT_MAX_RETRIES_KEY = "ipc.client.connect.max.retries";
|
|
|
|
+ public static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
|
|
|
|
+
|
|
public static final Log LOG =
|
|
public static final Log LOG =
|
|
LogFactory.getLog(Client.class);
|
|
LogFactory.getLog(Client.class);
|
|
private Hashtable<ConnectionId, Connection> connections =
|
|
private Hashtable<ConnectionId, Connection> connections =
|
|
@@ -197,9 +200,10 @@ public class Client {
|
|
private int rpcTimeout;
|
|
private int rpcTimeout;
|
|
private int maxIdleTime; //connections will be culled if it was idle for
|
|
private int maxIdleTime; //connections will be culled if it was idle for
|
|
//maxIdleTime msecs
|
|
//maxIdleTime msecs
|
|
- private int maxRetries; //the max. no. of retries for socket connections
|
|
|
|
|
|
+ private final RetryPolicy connectionRetryPolicy;
|
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
private int pingInterval; // how often sends ping to the server in msecs
|
|
private int pingInterval; // how often sends ping to the server in msecs
|
|
|
|
+
|
|
|
|
|
|
// currently active calls
|
|
// currently active calls
|
|
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
|
|
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
|
|
@@ -215,7 +219,7 @@ public class Client {
|
|
remoteId.getAddress().getHostName());
|
|
remoteId.getAddress().getHostName());
|
|
}
|
|
}
|
|
this.maxIdleTime = remoteId.getMaxIdleTime();
|
|
this.maxIdleTime = remoteId.getMaxIdleTime();
|
|
- this.maxRetries = remoteId.getMaxRetries();
|
|
|
|
|
|
+ this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
|
|
this.tcpNoDelay = remoteId.getTcpNoDelay();
|
|
this.tcpNoDelay = remoteId.getTcpNoDelay();
|
|
this.pingInterval = remoteId.getPingInterval();
|
|
this.pingInterval = remoteId.getPingInterval();
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -453,7 +457,7 @@ public class Client {
|
|
if (updateAddress()) {
|
|
if (updateAddress()) {
|
|
timeoutFailures = ioFailures = 0;
|
|
timeoutFailures = ioFailures = 0;
|
|
}
|
|
}
|
|
- handleConnectionFailure(ioFailures++, maxRetries, ie);
|
|
|
|
|
|
+ handleConnectionFailure(ioFailures++, ie);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -663,8 +667,26 @@ public class Client {
|
|
Thread.sleep(1000);
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException ignored) {}
|
|
} catch (InterruptedException ignored) {}
|
|
|
|
|
|
- LOG.info("Retrying connect to server: " + server +
|
|
|
|
- ". Already tried " + curRetries + " time(s).");
|
|
|
|
|
|
+ LOG.info("Retrying connect to server: " + server + ". Already tried "
|
|
|
|
+ + curRetries + " time(s); maxRetries=" + maxRetries);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void handleConnectionFailure(int curRetries, IOException ioe
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ closeConnection();
|
|
|
|
+
|
|
|
|
+ final boolean retry;
|
|
|
|
+ try {
|
|
|
|
+ retry = connectionRetryPolicy.shouldRetry(ioe, curRetries);
|
|
|
|
+ } catch(Exception e) {
|
|
|
|
+ throw e instanceof IOException? (IOException)e: new IOException(e);
|
|
|
|
+ }
|
|
|
|
+ if (!retry) {
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ LOG.info("Retrying connect to server: " + server + ". Already tried "
|
|
|
|
+ + curRetries + " time(s); retry policy is " + connectionRetryPolicy);
|
|
}
|
|
}
|
|
|
|
|
|
/* Write the RPC header */
|
|
/* Write the RPC header */
|
|
@@ -1220,14 +1242,15 @@ public class Client {
|
|
private String serverPrincipal;
|
|
private String serverPrincipal;
|
|
private int maxIdleTime; //connections will be culled if it was idle for
|
|
private int maxIdleTime; //connections will be culled if it was idle for
|
|
//maxIdleTime msecs
|
|
//maxIdleTime msecs
|
|
- private int maxRetries; //the max. no. of retries for socket connections
|
|
|
|
|
|
+ private final RetryPolicy connectionRetryPolicy;
|
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
private int pingInterval; // how often sends ping to the server in msecs
|
|
private int pingInterval; // how often sends ping to the server in msecs
|
|
|
|
+
|
|
|
|
|
|
ConnectionId(InetSocketAddress address, Class<?> protocol,
|
|
ConnectionId(InetSocketAddress address, Class<?> protocol,
|
|
UserGroupInformation ticket, int rpcTimeout,
|
|
UserGroupInformation ticket, int rpcTimeout,
|
|
String serverPrincipal, int maxIdleTime,
|
|
String serverPrincipal, int maxIdleTime,
|
|
- int maxRetries, boolean tcpNoDelay,
|
|
|
|
|
|
+ RetryPolicy connectionRetryPolicy, boolean tcpNoDelay,
|
|
int pingInterval) {
|
|
int pingInterval) {
|
|
this.protocol = protocol;
|
|
this.protocol = protocol;
|
|
this.address = address;
|
|
this.address = address;
|
|
@@ -1235,7 +1258,7 @@ public class Client {
|
|
this.rpcTimeout = rpcTimeout;
|
|
this.rpcTimeout = rpcTimeout;
|
|
this.serverPrincipal = serverPrincipal;
|
|
this.serverPrincipal = serverPrincipal;
|
|
this.maxIdleTime = maxIdleTime;
|
|
this.maxIdleTime = maxIdleTime;
|
|
- this.maxRetries = maxRetries;
|
|
|
|
|
|
+ this.connectionRetryPolicy = connectionRetryPolicy;
|
|
this.tcpNoDelay = tcpNoDelay;
|
|
this.tcpNoDelay = tcpNoDelay;
|
|
this.pingInterval = pingInterval;
|
|
this.pingInterval = pingInterval;
|
|
}
|
|
}
|
|
@@ -1264,10 +1287,6 @@ public class Client {
|
|
return maxIdleTime;
|
|
return maxIdleTime;
|
|
}
|
|
}
|
|
|
|
|
|
- int getMaxRetries() {
|
|
|
|
- return maxRetries;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
boolean getTcpNoDelay() {
|
|
boolean getTcpNoDelay() {
|
|
return tcpNoDelay;
|
|
return tcpNoDelay;
|
|
}
|
|
}
|
|
@@ -1285,11 +1304,26 @@ public class Client {
|
|
static ConnectionId getConnectionId(InetSocketAddress addr,
|
|
static ConnectionId getConnectionId(InetSocketAddress addr,
|
|
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
|
|
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
|
|
+ return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static ConnectionId getConnectionId(InetSocketAddress addr,
|
|
|
|
+ Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
|
|
|
|
+ RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException {
|
|
|
|
+
|
|
|
|
+ if (connectionRetryPolicy == null) {
|
|
|
|
+ final int max = conf.getInt(
|
|
|
|
+ IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
|
|
|
+ IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
|
|
|
|
+ connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
|
|
|
+ max, 1, TimeUnit.SECONDS);
|
|
|
|
+ }
|
|
|
|
+
|
|
String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
|
|
String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
|
|
return new ConnectionId(addr, protocol, ticket,
|
|
return new ConnectionId(addr, protocol, ticket,
|
|
rpcTimeout, remotePrincipal,
|
|
rpcTimeout, remotePrincipal,
|
|
conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
|
|
conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
|
|
- conf.getInt("ipc.client.connect.max.retries", 10),
|
|
|
|
|
|
+ connectionRetryPolicy,
|
|
conf.getBoolean("ipc.client.tcpnodelay", false),
|
|
conf.getBoolean("ipc.client.tcpnodelay", false),
|
|
Client.getPingInterval(conf));
|
|
Client.getPingInterval(conf));
|
|
}
|
|
}
|
|
@@ -1326,7 +1360,7 @@ public class Client {
|
|
ConnectionId that = (ConnectionId) obj;
|
|
ConnectionId that = (ConnectionId) obj;
|
|
return isEqual(this.address, that.address)
|
|
return isEqual(this.address, that.address)
|
|
&& this.maxIdleTime == that.maxIdleTime
|
|
&& this.maxIdleTime == that.maxIdleTime
|
|
- && this.maxRetries == that.maxRetries
|
|
|
|
|
|
+ && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy)
|
|
&& this.pingInterval == that.pingInterval
|
|
&& this.pingInterval == that.pingInterval
|
|
&& isEqual(this.protocol, that.protocol)
|
|
&& isEqual(this.protocol, that.protocol)
|
|
&& this.rpcTimeout == that.rpcTimeout
|
|
&& this.rpcTimeout == that.rpcTimeout
|
|
@@ -1339,10 +1373,9 @@ public class Client {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public int hashCode() {
|
|
public int hashCode() {
|
|
- int result = 1;
|
|
|
|
|
|
+ int result = connectionRetryPolicy.hashCode();
|
|
result = PRIME * result + ((address == null) ? 0 : address.hashCode());
|
|
result = PRIME * result + ((address == null) ? 0 : address.hashCode());
|
|
result = PRIME * result + maxIdleTime;
|
|
result = PRIME * result + maxIdleTime;
|
|
- result = PRIME * result + maxRetries;
|
|
|
|
result = PRIME * result + pingInterval;
|
|
result = PRIME * result + pingInterval;
|
|
result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
|
|
result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
|
|
result = PRIME * rpcTimeout;
|
|
result = PRIME * rpcTimeout;
|