|
@@ -21,6 +21,7 @@ import org.apache.hadoop.io.*;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
+import org.apache.hadoop.io.retry.RetryUtils;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
@@ -124,98 +125,28 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
|
|
|
ClientProtocol.versionID, nameNodeAddr, ugi, conf,
|
|
|
NetUtils.getSocketFactory(conf, ClientProtocol.class), 0,
|
|
|
- getMultipleLinearRandomRetry(conf));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the default retry policy used in RPC.
|
|
|
- *
|
|
|
- * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
|
|
|
- *
|
|
|
- * Otherwise,
|
|
|
- * (1) use multipleLinearRandomRetry for
|
|
|
- * - SafeModeException, or
|
|
|
- * - IOException other than RemoteException; and
|
|
|
- * (2) use TRY_ONCE_THEN_FAIL for
|
|
|
- * - non-SafeMode RemoteException, or
|
|
|
- * - non-IOException.
|
|
|
- *
|
|
|
- * Note that dfs.client.retry.max < 0 is not allowed.
|
|
|
- */
|
|
|
- public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
|
|
|
- final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
|
|
|
+ RetryUtils.getMultipleLinearRandomRetry(
|
|
|
+ conf,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
- if (multipleLinearRandomRetry == null) {
|
|
|
- //no retry
|
|
|
- return RetryPolicies.TRY_ONCE_THEN_FAIL;
|
|
|
- } else {
|
|
|
- //use exponential backoff
|
|
|
- return new RetryPolicy() {
|
|
|
- @Override
|
|
|
- public boolean shouldRetry(Exception e, int retries) throws Exception {
|
|
|
- //see (1) and (2) in the javadoc of this method.
|
|
|
- final RetryPolicy p;
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- final RemoteException re = (RemoteException)e;
|
|
|
- p = SafeModeException.class.getName().equals(re.getClassName())?
|
|
|
- multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
|
|
|
- } else if (e instanceof IOException) {
|
|
|
- p = multipleLinearRandomRetry;
|
|
|
- } else { //non-IOException
|
|
|
- p = RetryPolicies.TRY_ONCE_THEN_FAIL;
|
|
|
- }
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("RETRY " + retries + ") policy="
|
|
|
- + p.getClass().getSimpleName() + ", exception=" + e);
|
|
|
- }
|
|
|
- return p.shouldRetry(e, retries);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- return "RetryPolicy[" + multipleLinearRandomRetry + ", "
|
|
|
- + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
|
|
|
- + "]";
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the MultipleLinearRandomRetry policy specified in the conf,
|
|
|
- * or null if the feature is disabled.
|
|
|
- * If the policy is specified in the conf but the policy cannot be parsed,
|
|
|
- * the default policy is returned.
|
|
|
- *
|
|
|
- * Conf property: N pairs of sleep-time and number-of-retries
|
|
|
- * dfs.client.retry.policy = "s1,n1,s2,n2,..."
|
|
|
- */
|
|
|
- private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
|
|
|
- final boolean enabled = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
|
|
|
- if (!enabled) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- final String policy = conf.get(
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
|
|
-
|
|
|
- final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
|
|
|
- return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
|
|
|
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
|
|
- }
|
|
|
-
|
|
|
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
|
|
|
Configuration conf) throws IOException {
|
|
|
//default policy
|
|
|
- final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
|
|
|
-
|
|
|
+ final RetryPolicy defaultPolicy =
|
|
|
+ RetryUtils.getDefaultRetryPolicy(
|
|
|
+ conf,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
|
|
|
+ SafeModeException.class
|
|
|
+ );
|
|
|
+
|
|
|
//create policy
|
|
|
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
|
|
5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|