Browse Source

HDFS-3504. Support configurable retry policy in DFSClient for RPC connections and RPC calls, and add MultipleLinearRandomRetry, a new retry policy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1349124 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
45fafc2b8f
16 changed files with 675 additions and 120 deletions
  1. 8 9
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
  2. 230 12
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
  3. 6 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
  4. 7 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java
  5. 83 39
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
  6. 19 9
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
  7. 11 8
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
  8. 3 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java
  9. 3 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
  10. 7 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
  11. 18 12
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
  12. 4 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  13. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  14. 107 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
  15. 4 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  16. 161 10
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

+ 8 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.ipc.RpcInvocationHandler;
 
 class RetryInvocationHandler implements RpcInvocationHandler {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
-  private FailoverProxyProvider proxyProvider;
+  private final FailoverProxyProvider proxyProvider;
 
   /**
    * The number of times the associated proxyProvider has ever been failed over.
@@ -41,26 +41,25 @@ class RetryInvocationHandler implements RpcInvocationHandler {
   private long proxyProviderFailoverCount = 0;
   private volatile boolean hasMadeASuccessfulCall = false;
   
-  private RetryPolicy defaultPolicy;
-  private Map<String,RetryPolicy> methodNameToPolicyMap;
+  private final RetryPolicy defaultPolicy;
+  private final Map<String,RetryPolicy> methodNameToPolicyMap;
   private Object currentProxy;
   
   public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
       RetryPolicy retryPolicy) {
-    this.proxyProvider = proxyProvider;
-    this.defaultPolicy = retryPolicy;
-    this.methodNameToPolicyMap = Collections.emptyMap();
-    this.currentProxy = proxyProvider.getProxy();
+    this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
   }
-  
+
   public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+      RetryPolicy defaultPolicy,
       Map<String, RetryPolicy> methodNameToPolicyMap) {
     this.proxyProvider = proxyProvider;
-    this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+    this.defaultPolicy = defaultPolicy;
     this.methodNameToPolicyMap = methodNameToPolicyMap;
     this.currentProxy = proxyProvider.getProxy();
   }
 
+  @Override
   public Object invoke(Object proxy, Method method, Object[] args)
     throws Throwable {
     RetryPolicy policy = methodNameToPolicyMap.get(method.getName());

+ 230 - 12
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java

@@ -22,10 +22,13 @@ import java.net.ConnectException;
 import java.net.NoRouteToHostException;
 import java.net.SocketException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -33,8 +36,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * <p>
  * A collection of useful implementations of {@link RetryPolicy}.
@@ -44,7 +45,12 @@ public class RetryPolicies {
   
   public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
   
-  private static final Random RAND = new Random();
+  private static ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
+    @Override
+    protected Random initialValue() {
+      return new Random();
+    }
+  };
   
   /**
    * <p>
@@ -157,17 +163,35 @@ public class RetryPolicies {
     }
   }
   
+  /**
+   * Retry up to maxRetries.
+   * The actual sleep time of the n-th retry is f(n, sleepTime),
+   * where f is a function provided by the subclass implementation.
+   *
+   * The object of the subclasses should be immutable;
+   * otherwise, the subclass must override hashCode(), equals(..) and toString().
+   */
   static abstract class RetryLimited implements RetryPolicy {
-    int maxRetries;
-    long sleepTime;
-    TimeUnit timeUnit;
+    final int maxRetries;
+    final long sleepTime;
+    final TimeUnit timeUnit;
     
-    public RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+    private String myString;
+
+    RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+      if (maxRetries < 0) {
+        throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0");
+      }
+      if (sleepTime < 0) {
+        throw new IllegalArgumentException("sleepTime = " + sleepTime + " < 0");
+      }
+
       this.maxRetries = maxRetries;
       this.sleepTime = sleepTime;
       this.timeUnit = timeUnit;
     }
 
+    @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isMethodIdempotent) throws Exception {
       if (retries >= maxRetries) {
@@ -178,6 +202,30 @@ public class RetryPolicies {
     }
     
     protected abstract long calculateSleepTime(int retries);
+    
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+    
+    @Override
+    public boolean equals(final Object that) {
+      if (this == that) {
+        return true;
+      } else if (that == null || this.getClass() != that.getClass()) {
+        return false;
+      }
+      return this.toString().equals(that.toString());
+    }
+
+    @Override
+    public String toString() {
+      if (myString == null) {
+        myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries
+            + ", sleepTime=" + sleepTime + " " + timeUnit + ")";
+      }
+      return myString;
+    }
   }
   
   static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited {
@@ -208,6 +256,169 @@ public class RetryPolicies {
     }
   }
   
+  /**
+   * Given pairs of number of retries and sleep time (n0, t0), (n1, t1), ...,
+   * the first n0 retries sleep t0 milliseconds on average,
+   * the following n1 retries sleep t1 milliseconds on average, and so on.
+   * 
+   * For all the sleep, the actual sleep time is randomly uniform distributed
+   * in the close interval [0.5t, 1.5t], where t is the sleep time specified.
+   *
+   * The objects of this class are immutable.
+   */
+  public static class MultipleLinearRandomRetry implements RetryPolicy {
+    /** Pairs of numRetries and sleepSeconds */
+    public static class Pair {
+      final int numRetries;
+      final int sleepMillis;
+      
+      public Pair(final int numRetries, final int sleepMillis) {
+        if (numRetries < 0) {
+          throw new IllegalArgumentException("numRetries = " + numRetries+" < 0");
+        }
+        if (sleepMillis < 0) {
+          throw new IllegalArgumentException("sleepMillis = " + sleepMillis + " < 0");
+        }
+
+        this.numRetries = numRetries;
+        this.sleepMillis = sleepMillis;
+      }
+      
+      @Override
+      public String toString() {
+        return numRetries + "x" + sleepMillis + "ms";
+      }
+    }
+
+    private final List<Pair> pairs;
+    private String myString;
+
+    public MultipleLinearRandomRetry(List<Pair> pairs) {
+      if (pairs == null || pairs.isEmpty()) {
+        throw new IllegalArgumentException("pairs must be neither null nor empty.");
+      }
+      this.pairs = Collections.unmodifiableList(pairs);
+    }
+
+    @Override
+    public RetryAction shouldRetry(Exception e, int curRetry, int failovers,
+        boolean isMethodIdempotent) throws Exception {
+      final Pair p = searchPair(curRetry);
+      if (p == null) {
+        //no more retries.
+        return RetryAction.FAIL;
+      }
+
+      //calculate sleep time and return.
+      final double ratio = RANDOM.get().nextDouble() + 0.5;//0.5 <= ratio <=1.5
+      final long sleepTime = Math.round(p.sleepMillis * ratio);
+      return new RetryAction(RetryAction.RetryDecision.RETRY, sleepTime);
+    }
+
+    /**
+     * Given the current number of retry, search the corresponding pair.
+     * @return the corresponding pair,
+     *   or null if the current number of retry > maximum number of retry. 
+     */
+    private Pair searchPair(int curRetry) {
+      int i = 0;
+      for(; i < pairs.size() && curRetry > pairs.get(i).numRetries; i++) {
+        curRetry -= pairs.get(i).numRetries;
+      }
+      return i == pairs.size()? null: pairs.get(i);
+    }
+    
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+    
+    @Override
+    public boolean equals(final Object that) {
+      if (this == that) {
+        return true;
+      } else if (that == null || this.getClass() != that.getClass()) {
+        return false;
+      }
+      return this.toString().equals(that.toString());
+    }
+
+    @Override
+    public String toString() {
+      if (myString == null) {
+        myString = getClass().getSimpleName() + pairs;
+      }
+      return myString;
+    }
+
+    /**
+     * Parse the given string as a MultipleLinearRandomRetry object.
+     * The format of the string is "t_1, n_1, t_2, n_2, ...",
+     * where t_i and n_i are the i-th pair of sleep time and number of retires.
+     * Note that the white spaces in the string are ignored.
+     *
+     * @return the parsed object, or null if the parsing fails.
+     */
+    public static MultipleLinearRandomRetry parseCommaSeparatedString(String s) {
+      final String[] elements = s.split(",");
+      if (elements.length == 0) {
+        LOG.warn("Illegal value: there is no element in \"" + s + "\".");
+        return null;
+      }
+      if (elements.length % 2 != 0) {
+        LOG.warn("Illegal value: the number of elements in \"" + s + "\" is "
+            + elements.length + " but an even number of elements is expected.");
+        return null;
+      }
+
+      final List<RetryPolicies.MultipleLinearRandomRetry.Pair> pairs
+          = new ArrayList<RetryPolicies.MultipleLinearRandomRetry.Pair>();
+   
+      for(int i = 0; i < elements.length; ) {
+        //parse the i-th sleep-time
+        final int sleep = parsePositiveInt(elements, i++, s);
+        if (sleep == -1) {
+          return null; //parse fails
+        }
+
+        //parse the i-th number-of-retries
+        final int retries = parsePositiveInt(elements, i++, s);
+        if (retries == -1) {
+          return null; //parse fails
+        }
+
+        pairs.add(new RetryPolicies.MultipleLinearRandomRetry.Pair(retries, sleep));
+      }
+      return new RetryPolicies.MultipleLinearRandomRetry(pairs);
+    }
+
+    /**
+     * Parse the i-th element as an integer.
+     * @return -1 if the parsing fails or the parsed value <= 0;
+     *   otherwise, return the parsed value.
+     */
+    private static int parsePositiveInt(final String[] elements,
+        final int i, final String originalString) {
+      final String s = elements[i].trim();
+      final int n;
+      try {
+        n = Integer.parseInt(s);
+      } catch(NumberFormatException nfe) {
+        LOG.warn("Failed to parse \"" + s + "\", which is the index " + i
+            + " element in \"" + originalString + "\"", nfe);
+        return -1;
+      }
+
+      if (n <= 0) {
+        LOG.warn("The value " + n + " <= 0: it is parsed from the string \""
+            + s + "\" which is the index " + i + " element in \""
+            + originalString + "\"");
+        return -1;
+      }
+      return n;
+    }
+  }
+
   static class ExceptionDependentRetry implements RetryPolicy {
 
     RetryPolicy defaultPolicy;
@@ -265,6 +476,14 @@ public class RetryPolicies {
     public ExponentialBackoffRetry(
         int maxRetries, long sleepTime, TimeUnit timeUnit) {
       super(maxRetries, sleepTime, timeUnit);
+
+      if (maxRetries < 0) {
+        throw new IllegalArgumentException("maxRetries = " + maxRetries + " < 0");
+      } else if (maxRetries >= Long.SIZE - 1) {
+        //calculateSleepTime may overflow. 
+        throw new IllegalArgumentException("maxRetries = " + maxRetries
+            + " >= " + (Long.SIZE - 1));
+      }
     }
     
     @Override
@@ -353,11 +572,10 @@ public class RetryPolicies {
    * @param cap value at which to cap the base sleep time
    * @return an amount of time to sleep
    */
-  @VisibleForTesting
-  public static long calculateExponentialTime(long time, int retries,
+  private static long calculateExponentialTime(long time, int retries,
       long cap) {
-    long baseTime = Math.min(time * ((long)1 << retries), cap);
-    return (long) (baseTime * (RAND.nextFloat() + 0.5));
+    long baseTime = Math.min(time * (1L << retries), cap);
+    return (long) (baseTime * (RANDOM.get().nextDouble() + 0.5));
   }
 
   private static long calculateExponentialTime(long time, int retries) {

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java

@@ -60,6 +60,12 @@ public interface RetryPolicy {
       this.reason = reason;
     }
     
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "(action=" + action
+          + ", delayMillis=" + delayMillis + ", reason=" + reason + ")";
+    }
+    
     public enum RetryDecision {
       FAIL,
       RETRY,

+ 7 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java

@@ -75,9 +75,10 @@ public class RetryProxy {
    */
   public static Object create(Class<?> iface, Object implementation,
                               Map<String,RetryPolicy> methodNameToPolicyMap) {
-    return RetryProxy.create(iface,
+    return create(iface,
         new DefaultFailoverProxyProvider(iface, implementation),
-        methodNameToPolicyMap);
+        methodNameToPolicyMap,
+        RetryPolicies.TRY_ONCE_THEN_FAIL);
   }
 
   /**
@@ -92,11 +93,13 @@ public class RetryProxy {
    * @return the retry proxy
    */
   public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
-      Map<String,RetryPolicy> methodNameToPolicyMap) {
+      Map<String,RetryPolicy> methodNameToPolicyMap,
+      RetryPolicy defaultPolicy) {
     return Proxy.newProxyInstance(
         proxyProvider.getInterface().getClassLoader(),
         new Class<?>[] { iface },
-        new RetryInvocationHandler(proxyProvider, methodNameToPolicyMap)
+        new RetryInvocationHandler(proxyProvider, defaultPolicy,
+            methodNameToPolicyMap)
         );
   }
 }

+ 83 - 39
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -18,47 +18,51 @@
 
 package org.apache.hadoop.ipc;
 
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.io.IOException;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.FilterInputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
-
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
-import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.*;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
 import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SaslRpcClient;
@@ -67,8 +71,8 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -80,8 +84,8 @@ import org.apache.hadoop.util.ReflectionUtils;
  */
 public class Client {
   
-  public static final Log LOG =
-    LogFactory.getLog(Client.class);
+  public static final Log LOG = LogFactory.getLog(Client.class);
+
   private Hashtable<ConnectionId, Connection> connections =
     new Hashtable<ConnectionId, Connection>();
 
@@ -228,8 +232,7 @@ public class Client {
     private int rpcTimeout;
     private int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
-    private int maxRetries; //the max. no. of retries for socket connections
-    // the max. no. of retries for socket connections on time out exceptions
+    private final RetryPolicy connectionRetryPolicy;
     private int maxRetriesOnSocketTimeouts;
     private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private boolean doPing; //do we need to send ping message
@@ -253,7 +256,7 @@ public class Client {
       }
       this.rpcTimeout = remoteId.getRpcTimeout();
       this.maxIdleTime = remoteId.getMaxIdleTime();
-      this.maxRetries = remoteId.getMaxRetries();
+      this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
       this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
       this.tcpNoDelay = remoteId.getTcpNoDelay();
       this.doPing = remoteId.getDoPing();
@@ -488,7 +491,7 @@ public class Client {
           if (updateAddress()) {
             timeoutFailures = ioFailures = 0;
           }
-          handleConnectionFailure(ioFailures++, maxRetries, ie);
+          handleConnectionFailure(ioFailures++, ie);
         }
       }
     }
@@ -680,8 +683,36 @@ public class Client {
         Thread.sleep(1000);
       } catch (InterruptedException ignored) {}
       
-      LOG.info("Retrying connect to server: " + server + 
-          ". Already tried " + curRetries + " time(s).");
+      LOG.info("Retrying connect to server: " + server + ". Already tried "
+          + curRetries + " time(s); maxRetries=" + maxRetries);
+    }
+
+    private void handleConnectionFailure(int curRetries, IOException ioe
+        ) throws IOException {
+      closeConnection();
+
+      final RetryAction action;
+      try {
+        action = connectionRetryPolicy.shouldRetry(ioe, curRetries, 0, true);
+      } catch(Exception e) {
+        throw e instanceof IOException? (IOException)e: new IOException(e);
+      }
+      if (action.action == RetryAction.RetryDecision.FAIL) {
+        if (action.reason != null) {
+          LOG.warn("Failed to connect to server: " + server + ": "
+              + action.reason, ioe);
+        }
+        throw ioe;
+      }
+
+      try {
+        Thread.sleep(action.delayMillis);
+      } catch (InterruptedException e) {
+        throw (IOException)new InterruptedIOException("Interrupted: action="
+            + action + ", retry policy=" + connectionRetryPolicy).initCause(e);
+      }
+      LOG.info("Retrying connect to server: " + server + ". Already tried "
+          + curRetries + " time(s); retry policy is " + connectionRetryPolicy);
     }
 
     /**
@@ -849,6 +880,10 @@ public class Client {
       try {
         RpcResponseHeaderProto response = 
             RpcResponseHeaderProto.parseDelimitedFrom(in);
+        if (response == null) {
+          throw new IOException("Response is null.");
+        }
+
         int callId = response.getCallId();
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " got value #" + callId);
@@ -1287,7 +1322,7 @@ public class Client {
     private final String serverPrincipal;
     private final int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
-    private final int maxRetries; //the max. no. of retries for socket connections
+    private final RetryPolicy connectionRetryPolicy;
     // the max. no. of retries for socket connections on time out exceptions
     private final int maxRetriesOnSocketTimeouts;
     private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@@ -1297,7 +1332,7 @@ public class Client {
     ConnectionId(InetSocketAddress address, Class<?> protocol, 
                  UserGroupInformation ticket, int rpcTimeout,
                  String serverPrincipal, int maxIdleTime, 
-                 int maxRetries, int maxRetriesOnSocketTimeouts,
+                 RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts,
                  boolean tcpNoDelay, boolean doPing, int pingInterval) {
       this.protocol = protocol;
       this.address = address;
@@ -1305,7 +1340,7 @@ public class Client {
       this.rpcTimeout = rpcTimeout;
       this.serverPrincipal = serverPrincipal;
       this.maxIdleTime = maxIdleTime;
-      this.maxRetries = maxRetries;
+      this.connectionRetryPolicy = connectionRetryPolicy;
       this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts;
       this.tcpNoDelay = tcpNoDelay;
       this.doPing = doPing;
@@ -1336,10 +1371,6 @@ public class Client {
       return maxIdleTime;
     }
     
-    int getMaxRetries() {
-      return maxRetries;
-    }
-    
     /** max connection retries on socket time outs */
     public int getMaxRetriesOnSocketTimeouts() {
       return maxRetriesOnSocketTimeouts;
@@ -1357,6 +1388,12 @@ public class Client {
       return pingInterval;
     }
     
+    static ConnectionId getConnectionId(InetSocketAddress addr,
+        Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+        Configuration conf) throws IOException {
+      return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
+    }
+
     /**
      * Returns a ConnectionId object. 
      * @param addr Remote address for the connection.
@@ -1367,9 +1404,18 @@ public class Client {
      * @return A ConnectionId instance
      * @throws IOException
      */
-    public static ConnectionId getConnectionId(InetSocketAddress addr,
+    static ConnectionId getConnectionId(InetSocketAddress addr,
         Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
-        Configuration conf) throws IOException {
+        RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException {
+
+      if (connectionRetryPolicy == null) {
+        final int max = conf.getInt(
+            CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+            CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
+        connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+            max, 1, TimeUnit.SECONDS);
+      }
+
       String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
       boolean doPing =
         conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
@@ -1377,8 +1423,7 @@ public class Client {
           rpcTimeout, remotePrincipal,
           conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
               CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT),
-          conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
-              CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT),
+          connectionRetryPolicy,
           conf.getInt(
             CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
             CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT),
@@ -1421,7 +1466,7 @@ public class Client {
         return isEqual(this.address, that.address)
             && this.doPing == that.doPing
             && this.maxIdleTime == that.maxIdleTime
-            && this.maxRetries == that.maxRetries
+            && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy)
             && this.pingInterval == that.pingInterval
             && isEqual(this.protocol, that.protocol)
             && this.rpcTimeout == that.rpcTimeout
@@ -1434,11 +1479,10 @@ public class Client {
     
     @Override
     public int hashCode() {
-      int result = 1;
+      int result = connectionRetryPolicy.hashCode();
       result = PRIME * result + ((address == null) ? 0 : address.hashCode());
       result = PRIME * result + (doPing ? 1231 : 1237);
       result = PRIME * result + maxIdleTime;
-      result = PRIME * result + maxRetries;
       result = PRIME * result + pingInterval;
       result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
       result = PRIME * result + rpcTimeout;

+ 19 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -36,9 +36,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputOutputStream;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
-
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -66,15 +66,24 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+        rpcTimeout, null);
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
-      SocketFactory factory, int rpcTimeout) throws IOException {
+      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
+      ) throws IOException {
 
-    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol
-        .getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
-        addr, ticket, conf, factory, rpcTimeout)), false);
+    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
+        rpcTimeout, connectionRetryPolicy);
+    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
   }
   
   @Override
@@ -97,11 +106,12 @@ public class ProtobufRpcEngine implements RpcEngine {
     private final long clientProtocolVersion;
     private final String protocolName;
 
-    public Invoker(Class<?> protocol, InetSocketAddress addr,
+    private Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
-        int rpcTimeout) throws IOException {
-      this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
-          ticket, rpcTimeout, conf), conf, factory);
+        int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
+      this(protocol, Client.ConnectionId.getConnectionId(
+          addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
+          conf, factory);
     }
     
     /**

+ 11 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -41,6 +41,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
 import org.apache.hadoop.net.NetUtils;
@@ -326,7 +327,7 @@ public class RPC {
                              long clientVersion,
                              InetSocketAddress addr, Configuration conf,
                              long connTimeout) throws IOException { 
-    return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+    return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout);
   }
   
   /**
@@ -347,7 +348,7 @@ public class RPC {
                              int rpcTimeout,
                              long timeout) throws IOException {
     return waitForProtocolProxy(protocol, clientVersion, addr,
-        conf, rpcTimeout, timeout).getProxy();
+        conf, rpcTimeout, null, timeout).getProxy();
   }
 
   /**
@@ -367,6 +368,7 @@ public class RPC {
                                long clientVersion,
                                InetSocketAddress addr, Configuration conf,
                                int rpcTimeout,
+                               RetryPolicy connectionRetryPolicy,
                                long timeout) throws IOException { 
     long startTime = System.currentTimeMillis();
     IOException ioe;
@@ -374,7 +376,7 @@ public class RPC {
       try {
         return getProtocolProxy(protocol, clientVersion, addr, 
             UserGroupInformation.getCurrentUser(), conf, NetUtils
-            .getDefaultSocketFactory(conf), rpcTimeout);
+            .getDefaultSocketFactory(conf), rpcTimeout, connectionRetryPolicy);
       } catch(ConnectException se) {  // namenode has not been started
         LOG.info("Server at " + addr + " not available yet, Zzzzz...");
         ioe = se;
@@ -463,7 +465,7 @@ public class RPC {
                                 Configuration conf,
                                 SocketFactory factory) throws IOException {
     return getProtocolProxy(
-        protocol, clientVersion, addr, ticket, conf, factory, 0);
+        protocol, clientVersion, addr, ticket, conf, factory, 0, null);
   }
   
   /**
@@ -489,7 +491,7 @@ public class RPC {
                                 SocketFactory factory,
                                 int rpcTimeout) throws IOException {
     return getProtocolProxy(protocol, clientVersion, addr, ticket,
-             conf, factory, rpcTimeout).getProxy();
+             conf, factory, rpcTimeout, null).getProxy();
   }
   
   /**
@@ -512,12 +514,13 @@ public class RPC {
                                 UserGroupInformation ticket,
                                 Configuration conf,
                                 SocketFactory factory,
-                                int rpcTimeout) throws IOException {    
+                                int rpcTimeout,
+                                RetryPolicy connectionRetryPolicy) throws IOException {    
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
     }
-    return getProtocolEngine(protocol,conf).getProxy(protocol,
-        clientVersion, addr, ticket, conf, factory, rpcTimeout);
+    return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,
+        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
   }
 
    /**

+ 3 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java

@@ -97,8 +97,9 @@ public class RemoteException extends IOException {
     return new RemoteException(attrs.getValue("class"),
         attrs.getValue("message")); 
   }
-  
+
+  @Override
   public String toString() {
-    return className + ": " + getMessage();
+    return getClass().getName() + "(" + className + "): " + getMessage();
   }
 }

+ 3 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java

@@ -26,6 +26,7 @@ import javax.net.SocketFactory;
 
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -40,7 +41,8 @@ public interface RpcEngine {
   <T> ProtocolProxy<T> getProxy(Class<T> protocol,
                   long clientVersion, InetSocketAddress addr,
                   UserGroupInformation ticket, Configuration conf,
-                  SocketFactory factory, int rpcTimeout) throws IOException;
+                  SocketFactory factory, int rpcTimeout,
+                  RetryPolicy connectionRetryPolicy) throws IOException;
 
   /** Expert: Make multiple, parallel calls to a set of servers. */
   Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,

+ 7 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -31,6 +31,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.VersionedProtocol;
@@ -259,9 +260,14 @@ public class WritableRpcEngine implements RpcEngine {
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                          InetSocketAddress addr, UserGroupInformation ticket,
                          Configuration conf, SocketFactory factory,
-                         int rpcTimeout)
+                         int rpcTimeout, RetryPolicy connectionRetryPolicy)
     throws IOException {    
 
+    if (connectionRetryPolicy != null) {
+      throw new UnsupportedOperationException(
+          "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
+    }
+
     T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
         new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
             factory, rpcTimeout));

+ 18 - 12
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -18,50 +18,55 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
-import org.apache.hadoop.ipc.TestSaslRPC.TestSaslImpl;
-import org.apache.hadoop.ipc.TestSaslRPC.TestSaslProtocol;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.MockitoUtil;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
 
-import static org.apache.hadoop.test.MetricsAsserts.*;
-
 /** Unit tests for RPC. */
 @SuppressWarnings("deprecation")
 public class TestRPC {
@@ -250,7 +255,8 @@ public class TestRPC {
     @Override
     public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
         InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
-        SocketFactory factory, int rpcTimeout) throws IOException {
+        SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
+        ) throws IOException {
       T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
               new Class[] { protocol }, new StoppedInvocationHandler());
       return new ProtocolProxy<T>(protocol, proxy, false);

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -222,6 +222,10 @@ Branch-2 ( Unreleased changes )
 
     HDFS-3520. Add transfer rate logging to TransferFsImage. (eli)
 
+    HDFS-3504. Support configurable retry policy in DFSClient for RPC
+    connections and RPC calls, and add MultipleLinearRandomRetry, a new retry
+    policy.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -38,6 +38,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
   public static final String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.client.retry.policy.enabled";
+  public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false; 
+  public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.client.retry.policy.spec";
+  public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... 
   public static final String  DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
   public static final String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";

+ 107 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java

@@ -47,10 +47,12 @@ import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -66,6 +68,7 @@ import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
 
 /**
  * Create proxy objects to communicate with a remote NN. All remote access to an
@@ -240,12 +243,106 @@ public class NameNodeProxies {
     return new NamenodeProtocolTranslatorPB(proxy);
   }
   
+  /**
+   * Return the default retry policy used in RPC.
+   * 
+   * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
+   * 
+   * Otherwise, first unwrap ServiceException if possible, and then 
+   * (1) use multipleLinearRandomRetry for
+   *     - SafeModeException, or
+   *     - IOException other than RemoteException, or
+   *     - ServiceException; and
+   * (2) use TRY_ONCE_THEN_FAIL for
+   *     - non-SafeMode RemoteException, or
+   *     - non-IOException.
+   *     
+   * Note that dfs.client.retry.max < 0 is not allowed.
+   */
+  private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
+    final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
+    }
+    if (multipleLinearRandomRetry == null) {
+      //no retry
+      return RetryPolicies.TRY_ONCE_THEN_FAIL;
+    } else {
+      return new RetryPolicy() {
+        @Override
+        public RetryAction shouldRetry(Exception e, int retries, int failovers,
+            boolean isMethodIdempotent) throws Exception {
+          if (e instanceof ServiceException) {
+            //unwrap ServiceException
+            final Throwable cause = e.getCause();
+            if (cause != null && cause instanceof Exception) {
+              e = (Exception)cause;
+            }
+          }
+
+          //see (1) and (2) in the javadoc of this method.
+          final RetryPolicy p;
+          if (e instanceof RemoteException) {
+            final RemoteException re = (RemoteException)e;
+            p = SafeModeException.class.getName().equals(re.getClassName())?
+                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
+          } else if (e instanceof IOException || e instanceof ServiceException) {
+            p = multipleLinearRandomRetry;
+          } else { //non-IOException
+            p = RetryPolicies.TRY_ONCE_THEN_FAIL;
+          }
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("RETRY " + retries + ") policy="
+                + p.getClass().getSimpleName() + ", exception=" + e);
+          }
+          LOG.info("RETRY " + retries + ") policy="
+              + p.getClass().getSimpleName() + ", exception=" + e);
+          return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
+        }
+      };
+    }
+  }
+
+  /**
+   * Return the MultipleLinearRandomRetry policy specified in the conf,
+   * or null if the feature is disabled.
+   * If the policy is specified in the conf but the policy cannot be parsed,
+   * the default policy is returned.
+   * 
+   * Conf property: N pairs of sleep-time and number-of-retries
+   *   dfs.client.retry.policy = "s1,n1,s2,n2,..."
+   */
+  private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
+    final boolean enabled = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
+    if (!enabled) {
+      return null;
+    }
+
+    final String policy = conf.get(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+
+    final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
+    return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+  }
+
   private static ClientProtocol createNNProxyWithClientProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries) throws IOException {
-    ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB) NameNodeProxies
-        .createNameNodeProxy(address, conf, ugi, ClientNamenodeProtocolPB.class, 0);
+    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
+
+    final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
+    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
+    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
+        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
+        NetUtils.getDefaultSocketFactory(conf), 0, defaultPolicy).getProxy();
+
     if (withRetries) { // create the proxy with retries
+
       RetryPolicy createPolicy = RetryPolicies
           .retryUpToMaximumCountWithFixedSleep(5,
               HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
@@ -258,17 +355,21 @@ public class NameNodeProxies {
       Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
                  = new HashMap<Class<? extends Exception>, RetryPolicy>();
       exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
-          .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+          .retryByRemoteException(defaultPolicy,
               remoteExceptionToPolicyMap));
       RetryPolicy methodPolicy = RetryPolicies.retryByException(
-          RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+          defaultPolicy, exceptionToPolicyMap);
       Map<String, RetryPolicy> methodNameToPolicyMap 
                  = new HashMap<String, RetryPolicy>();
     
       methodNameToPolicyMap.put("create", methodPolicy);
     
-      proxy = (ClientNamenodeProtocolPB) RetryProxy
-          .create(ClientNamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
+      proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
+          ClientNamenodeProtocolPB.class,
+          new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
+              ClientNamenodeProtocolPB.class, proxy),
+          methodNameToPolicyMap,
+          defaultPolicy);
     }
     return new ClientNamenodeProtocolTranslatorPB(proxy);
   }

+ 4 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -25,8 +25,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
@@ -39,6 +37,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 
@@ -66,12 +66,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
-import org.apache.hadoop.ha.HAServiceProtocolHelper;
 import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
-import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -1401,7 +1398,6 @@ public class MiniDFSCluster {
       waitClusterUp();
       LOG.info("Restarted the namenode");
       waitActive();
-      LOG.info("Cluster is active");
     }
   }
 
@@ -1777,6 +1773,7 @@ public class MiniDFSCluster {
         }
       }
     }
+    LOG.info("Cluster is active");
   }
   
   private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,

+ 161 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -25,46 +25,53 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-import java.net.SocketTimeoutException;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
+import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.mockito.Mockito;
 import org.mockito.internal.stubbing.answers.ThrowsException;
 import org.mockito.invocation.InvocationOnMock;
@@ -341,7 +348,7 @@ public class TestDFSClientRetries extends TestCase {
 
           // We shouldn't have gained an extra block by the RPC.
           assertEquals(blockCount, blockCount2);
-          return (LocatedBlock) ret2;
+          return ret2;
         }
       }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
@@ -798,5 +805,149 @@ public class TestDFSClientRetries extends TestCase {
       cluster.shutdown();
     }
   }
-}
 
+  /** Test client retry with namenode restarting. */
+  public void testNamenodeRestart() throws Exception {
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+
+    final List<Exception> exceptions = new ArrayList<Exception>();
+
+    final Path dir = new Path("/testNamenodeRestart");
+
+    final Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+
+    final short numDatanodes = 3;
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDatanodes)
+        .build();
+    try {
+      cluster.waitActive();
+
+      //create a file
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final long length = 1L << 20;
+      final Path file1 = new Path(dir, "foo"); 
+      DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L);
+
+      //get file status
+      final FileStatus s1 = dfs.getFileStatus(file1);
+      assertEquals(length, s1.getLen());
+
+      //shutdown namenode
+      cluster.shutdownNameNode(0);
+
+      //namenode is down, create another file in a thread
+      final Path file3 = new Path(dir, "file"); 
+      final Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //it should retry till namenode is up.
+            final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf);
+            DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L);
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      });
+      thread.start();
+
+      //restart namenode in a new thread
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //sleep, restart, and then wait active
+            TimeUnit.SECONDS.sleep(30);
+            cluster.restartNameNode(0, false);
+            cluster.waitActive();
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      }).start();
+
+      //namenode is down, it should retry until namenode is up again. 
+      final FileStatus s2 = dfs.getFileStatus(file1);
+      assertEquals(s1, s2);
+
+      //check file1 and file3
+      thread.join();
+      assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3));
+
+      //enter safe mode
+      dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      
+      //leave safe mode in a new thread
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //sleep and then leave safe mode
+            TimeUnit.SECONDS.sleep(30);
+            dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      }).start();
+
+      //namenode is in safe mode, create should retry until it leaves safe mode.
+      final Path file2 = new Path(dir, "bar");
+      DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L);
+      assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2));
+      
+      //make sure it won't retry on exceptions like FileNotFoundException
+      final Path nonExisting = new Path(dir, "nonExisting");
+      LOG.info("setPermission: " + nonExisting);
+      try {
+        dfs.setPermission(nonExisting, new FsPermission((short)0));
+        fail();
+      } catch(FileNotFoundException fnfe) {
+        LOG.info("GOOD!", fnfe);
+      }
+
+      if (!exceptions.isEmpty()) {
+        LOG.error("There are " + exceptions.size() + " exception(s):");
+        for(int i = 0; i < exceptions.size(); i++) {
+          LOG.error("Exception " + i, exceptions.get(i));
+        }
+        fail();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  public void testMultipleLinearRandomRetry() {
+    parseMultipleLinearRandomRetry(null, "");
+    parseMultipleLinearRandomRetry(null, "11");
+    parseMultipleLinearRandomRetry(null, "11,22,33");
+    parseMultipleLinearRandomRetry(null, "11,22,33,44,55");
+    parseMultipleLinearRandomRetry(null, "AA");
+    parseMultipleLinearRandomRetry(null, "11,AA");
+    parseMultipleLinearRandomRetry(null, "11,22,33,FF");
+    parseMultipleLinearRandomRetry(null, "11,-22");
+    parseMultipleLinearRandomRetry(null, "-11,22");
+
+    parseMultipleLinearRandomRetry("[22x11ms]",
+        "11,22");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms]",
+        "11,22,33,44");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]",
+        "11,22,33,44,55,66");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]",
+        "   11,   22, 33,  44, 55,  66   ");
+  }
+  
+  static void parseMultipleLinearRandomRetry(String expected, String s) {
+    final MultipleLinearRandomRetry r = MultipleLinearRandomRetry.parseCommaSeparatedString(s);
+    LOG.info("input=" + s + ", parsed=" + r + ", expected=" + expected);
+    if (r == null) {
+      assertEquals(expected, null);
+    } else {
+      assertEquals("MultipleLinearRandomRetry" + expected, r.toString());
+    }
+  }
+}