|
@@ -57,6 +57,7 @@ import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
+import org.apache.hadoop.io.retry.RetryUtils;
|
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
@@ -68,7 +69,6 @@ import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
|
|
|
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
-import com.google.protobuf.ServiceException;
|
|
|
|
|
|
/**
|
|
|
* Create proxy objects to communicate with a remote NN. All remote access to an
|
|
@@ -243,106 +243,20 @@ public class NameNodeProxies {
|
|
|
return new NamenodeProtocolTranslatorPB(proxy);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Return the default retry policy used in RPC.
|
|
|
- *
|
|
|
- * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
|
|
|
- *
|
|
|
- * Otherwise, first unwrap ServiceException if possible, and then
|
|
|
- * (1) use multipleLinearRandomRetry for
|
|
|
- * - SafeModeException, or
|
|
|
- * - IOException other than RemoteException, or
|
|
|
- * - ServiceException; and
|
|
|
- * (2) use TRY_ONCE_THEN_FAIL for
|
|
|
- * - non-SafeMode RemoteException, or
|
|
|
- * - non-IOException.
|
|
|
- *
|
|
|
- * Note that dfs.client.retry.max < 0 is not allowed.
|
|
|
- */
|
|
|
- public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
|
|
|
- final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
|
|
|
- }
|
|
|
- if (multipleLinearRandomRetry == null) {
|
|
|
- //no retry
|
|
|
- return RetryPolicies.TRY_ONCE_THEN_FAIL;
|
|
|
- } else {
|
|
|
- return new RetryPolicy() {
|
|
|
- @Override
|
|
|
- public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
|
|
- boolean isMethodIdempotent) throws Exception {
|
|
|
- if (e instanceof ServiceException) {
|
|
|
- //unwrap ServiceException
|
|
|
- final Throwable cause = e.getCause();
|
|
|
- if (cause != null && cause instanceof Exception) {
|
|
|
- e = (Exception)cause;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //see (1) and (2) in the javadoc of this method.
|
|
|
- final RetryPolicy p;
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- final RemoteException re = (RemoteException)e;
|
|
|
- p = SafeModeException.class.getName().equals(re.getClassName())?
|
|
|
- multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
|
|
|
- } else if (e instanceof IOException || e instanceof ServiceException) {
|
|
|
- p = multipleLinearRandomRetry;
|
|
|
- } else { //non-IOException
|
|
|
- p = RetryPolicies.TRY_ONCE_THEN_FAIL;
|
|
|
- }
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("RETRY " + retries + ") policy="
|
|
|
- + p.getClass().getSimpleName() + ", exception=" + e);
|
|
|
- }
|
|
|
- LOG.info("RETRY " + retries + ") policy="
|
|
|
- + p.getClass().getSimpleName() + ", exception=" + e);
|
|
|
- return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- return "RetryPolicy[" + multipleLinearRandomRetry + ", "
|
|
|
- + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
|
|
|
- + "]";
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the MultipleLinearRandomRetry policy specified in the conf,
|
|
|
- * or null if the feature is disabled.
|
|
|
- * If the policy is specified in the conf but the policy cannot be parsed,
|
|
|
- * the default policy is returned.
|
|
|
- *
|
|
|
- * Conf property: N pairs of sleep-time and number-of-retries
|
|
|
- * dfs.client.retry.policy = "s1,n1,s2,n2,..."
|
|
|
- */
|
|
|
- private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
|
|
|
- final boolean enabled = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
|
|
|
- if (!enabled) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- final String policy = conf.get(
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
|
|
-
|
|
|
- final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
|
|
|
- return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
|
|
- }
|
|
|
-
|
|
|
private static ClientProtocol createNNProxyWithClientProtocol(
|
|
|
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
|
boolean withRetries) throws IOException {
|
|
|
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
|
|
|
|
|
- final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
|
|
|
+ final RetryPolicy defaultPolicy =
|
|
|
+ RetryUtils.getDefaultRetryPolicy(
|
|
|
+ conf,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
|
|
|
+ SafeModeException.class);
|
|
|
+
|
|
|
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
|
|
|
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
|
|
|
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
|