Sfoglia il codice sorgente

HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh)

(cherry picked from commit 030fcfa99c345ad57625486eeabedebf2fd4411f)
Arun Suresh 9 anni fa
parent
commit
ffe6a34d5b

+ 49 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/MultiException.java

@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.io.retry;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Holder class that clients can use to return multiple exceptions.
+ */
+public class MultiException extends IOException {
+
+  private final Map<String, Exception> exes;
+
+  public MultiException(Map<String, Exception> exes) {
+    this.exes = exes;
+  }
+
+  public Map<String, Exception> getExceptions() {
+    return exes;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("{");
+    for (Exception e : exes.values()) {
+      sb.append(e.toString()).append(", ");
+    }
+    sb.append("}");
+    return "MultiException[" + sb.toString() + "]";
+  }
+}

+ 82 - 17
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

@@ -23,6 +23,8 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Proxy;
 import java.util.Collections;
 import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -102,7 +104,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
         Object ret = invokeMethod(method, args);
         Object ret = invokeMethod(method, args);
         hasMadeASuccessfulCall = true;
         hasMadeASuccessfulCall = true;
         return ret;
         return ret;
-      } catch (Exception e) {
+      } catch (Exception ex) {
         boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface()
         boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface()
             .getMethod(method.getName(), method.getParameterTypes())
             .getMethod(method.getName(), method.getParameterTypes())
             .isAnnotationPresent(Idempotent.class);
             .isAnnotationPresent(Idempotent.class);
@@ -111,15 +113,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
               .getMethod(method.getName(), method.getParameterTypes())
               .getMethod(method.getName(), method.getParameterTypes())
               .isAnnotationPresent(AtMostOnce.class);
               .isAnnotationPresent(AtMostOnce.class);
         }
         }
-        RetryAction action = policy.shouldRetry(e, retries++,
-            invocationFailoverCount, isIdempotentOrAtMostOnce);
-        if (action.action == RetryAction.RetryDecision.FAIL) {
-          if (action.reason != null) {
+        List<RetryAction> actions = extractActions(policy, ex, retries++,
+                invocationFailoverCount, isIdempotentOrAtMostOnce);
+        RetryAction failAction = getFailAction(actions);
+        if (failAction != null) {
+          if (failAction.reason != null) {
             LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
             LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
                 + "." + method.getName() + " over " + currentProxy.proxyInfo
                 + "." + method.getName() + " over " + currentProxy.proxyInfo
-                + ". Not retrying because " + action.reason, e);
+                + ". Not retrying because " + failAction.reason, ex);
           }
           }
-          throw e;
+          throw ex;
         } else { // retry or failover
         } else { // retry or failover
           // avoid logging the failover if this is the first call on this
           // avoid logging the failover if this is the first call on this
           // proxy object, and we successfully achieve the failover without
           // proxy object, and we successfully achieve the failover without
@@ -127,8 +130,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
           boolean worthLogging = 
           boolean worthLogging = 
             !(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
             !(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
           worthLogging |= LOG.isDebugEnabled();
           worthLogging |= LOG.isDebugEnabled();
-          if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
-              worthLogging) {
+          RetryAction failOverAction = getFailOverAction(actions);
+          long delay = getDelayMillis(actions);
+          if (failOverAction != null && worthLogging) {
             String msg = "Exception while invoking " + method.getName()
             String msg = "Exception while invoking " + method.getName()
                 + " of class " + currentProxy.proxy.getClass().getSimpleName()
                 + " of class " + currentProxy.proxy.getClass().getSimpleName()
                 + " over " + currentProxy.proxyInfo;
                 + " over " + currentProxy.proxyInfo;
@@ -136,22 +140,22 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
             if (invocationFailoverCount > 0) {
             if (invocationFailoverCount > 0) {
               msg += " after " + invocationFailoverCount + " fail over attempts"; 
               msg += " after " + invocationFailoverCount + " fail over attempts"; 
             }
             }
-            msg += ". Trying to fail over " + formatSleepMessage(action.delayMillis);
-            LOG.info(msg, e);
+            msg += ". Trying to fail over " + formatSleepMessage(delay);
+            LOG.info(msg, ex);
           } else {
           } else {
             if(LOG.isDebugEnabled()) {
             if(LOG.isDebugEnabled()) {
               LOG.debug("Exception while invoking " + method.getName()
               LOG.debug("Exception while invoking " + method.getName()
                   + " of class " + currentProxy.proxy.getClass().getSimpleName()
                   + " of class " + currentProxy.proxy.getClass().getSimpleName()
                   + " over " + currentProxy.proxyInfo + ". Retrying "
                   + " over " + currentProxy.proxyInfo + ". Retrying "
-                  + formatSleepMessage(action.delayMillis), e);
+                  + formatSleepMessage(delay), ex);
             }
             }
           }
           }
-          
-          if (action.delayMillis > 0) {
-            Thread.sleep(action.delayMillis);
+
+          if (delay > 0) {
+            Thread.sleep(delay);
           }
           }
           
           
-          if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
+          if (failOverAction != null) {
             // Make sure that concurrent failed method invocations only cause a
             // Make sure that concurrent failed method invocations only cause a
             // single actual fail over.
             // single actual fail over.
             synchronized (proxyProvider) {
             synchronized (proxyProvider) {
@@ -170,7 +174,68 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       }
       }
     }
     }
   }
   }
-  
+
+  /**
+   * Obtain a retry delay from list of RetryActions.
+   */
+  private long getDelayMillis(List<RetryAction> actions) {
+    long retVal = 0;
+    for (RetryAction action : actions) {
+      if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY ||
+              action.action == RetryAction.RetryDecision.RETRY) {
+        if (action.delayMillis > retVal) {
+          retVal = action.delayMillis;
+        }
+      }
+    }
+    return retVal;
+  }
+
+  /**
+   * Return the first FAILOVER_AND_RETRY action.
+   */
+  private RetryAction getFailOverAction(List<RetryAction> actions) {
+    for (RetryAction action : actions) {
+      if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
+        return action;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Return the last FAIL action.. only if there are no RETRY actions.
+   */
+  private RetryAction getFailAction(List<RetryAction> actions) {
+    RetryAction fAction = null;
+    for (RetryAction action : actions) {
+      if (action.action == RetryAction.RetryDecision.FAIL) {
+        fAction = action;
+      } else {
+        // Atleast 1 RETRY
+        return null;
+      }
+    }
+    return fAction;
+  }
+
+  private List<RetryAction> extractActions(RetryPolicy policy, Exception ex,
+                                           int i, int invocationFailoverCount,
+                                           boolean isIdempotentOrAtMostOnce)
+          throws Exception {
+    List<RetryAction> actions = new LinkedList<>();
+    if (ex instanceof MultiException) {
+      for (Exception th : ((MultiException) ex).getExceptions().values()) {
+        actions.add(policy.shouldRetry(th, i, invocationFailoverCount,
+                isIdempotentOrAtMostOnce));
+      }
+    } else {
+      actions.add(policy.shouldRetry(ex, i,
+              invocationFailoverCount, isIdempotentOrAtMostOnce));
+    }
+    return actions;
+  }
+
   private static String formatSleepMessage(long millis) {
   private static String formatSleepMessage(long millis) {
     if (millis > 0) {
     if (millis > 0) {
       return "after sleeping for " + millis + "ms.";
       return "after sleeping for " + millis + "ms.";

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -410,6 +410,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8735. Inotify: All events classes should implement toString() API.
     HDFS-8735. Inotify: All events classes should implement toString() API.
     (Surendra Singh Lilhore via aajisaka)
     (Surendra Singh Lilhore via aajisaka)
 
 
+    HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 42 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java

@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 
 
 /**
 /**
@@ -51,16 +53,40 @@ public class ConfiguredFailoverProxyProvider<T> extends
   private static final Log LOG =
   private static final Log LOG =
       LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
       LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
   
   
-  private final Configuration conf;
-  private final List<AddressRpcProxyPair<T>> proxies =
+  interface ProxyFactory<T> {
+    T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
+        UserGroupInformation ugi, boolean withRetries,
+        AtomicBoolean fallbackToSimpleAuth) throws IOException;
+  }
+
+  static class DefaultProxyFactory<T> implements ProxyFactory<T> {
+    @Override
+    public T createProxy(Configuration conf, InetSocketAddress nnAddr,
+        Class<T> xface, UserGroupInformation ugi, boolean withRetries,
+        AtomicBoolean fallbackToSimpleAuth) throws IOException {
+      return NameNodeProxies.createNonHAProxy(conf,
+          nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy();
+    }
+  }
+
+  protected final Configuration conf;
+  protected final List<AddressRpcProxyPair<T>> proxies =
       new ArrayList<AddressRpcProxyPair<T>>();
       new ArrayList<AddressRpcProxyPair<T>>();
   private final UserGroupInformation ugi;
   private final UserGroupInformation ugi;
-  private final Class<T> xface;
-  
+  protected final Class<T> xface;
+
   private int currentProxyIndex = 0;
   private int currentProxyIndex = 0;
+  private final ProxyFactory<T> factory;
 
 
   public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
   public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
       Class<T> xface) {
       Class<T> xface) {
+    this(conf, uri, xface, new DefaultProxyFactory<T>());
+  }
+
+  @VisibleForTesting
+  ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
+      Class<T> xface, ProxyFactory<T> factory) {
+
     Preconditions.checkArgument(
     Preconditions.checkArgument(
         xface.isAssignableFrom(NamenodeProtocols.class),
         xface.isAssignableFrom(NamenodeProtocols.class),
         "Interface class %s is not a valid NameNode protocol!");
         "Interface class %s is not a valid NameNode protocol!");
@@ -78,9 +104,10 @@ public class ConfiguredFailoverProxyProvider<T> extends
         HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
         HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
         HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
         HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
     this.conf.setInt(
     this.conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
-        maxRetriesOnSocketTimeouts);
-    
+            CommonConfigurationKeysPublic
+                    .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+            maxRetriesOnSocketTimeouts);
+
     try {
     try {
       ugi = UserGroupInformation.getCurrentUser();
       ugi = UserGroupInformation.getCurrentUser();
       
       
@@ -102,6 +129,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
       // URI of the cluster. Clone this token to apply to each of the
       // URI of the cluster. Clone this token to apply to each of the
       // underlying IPC addresses so that the IPC code can find it.
       // underlying IPC addresses so that the IPC code can find it.
       HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
       HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
+      this.factory = factory;
     } catch (IOException e) {
     } catch (IOException e) {
       throw new RuntimeException(e);
       throw new RuntimeException(e);
     }
     }
@@ -120,8 +148,8 @@ public class ConfiguredFailoverProxyProvider<T> extends
     AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
     AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
     if (current.namenode == null) {
     if (current.namenode == null) {
       try {
       try {
-        current.namenode = NameNodeProxies.createNonHAProxy(conf,
-            current.address, xface, ugi, false, fallbackToSimpleAuth).getProxy();
+        current.namenode = factory.createProxy(conf,
+            current.address, xface, ugi, false, fallbackToSimpleAuth);
       } catch (IOException e) {
       } catch (IOException e) {
         LOG.error("Failed to create RPC proxy to NameNode", e);
         LOG.error("Failed to create RPC proxy to NameNode", e);
         throw new RuntimeException(e);
         throw new RuntimeException(e);
@@ -131,7 +159,11 @@ public class ConfiguredFailoverProxyProvider<T> extends
   }
   }
 
 
   @Override
   @Override
-  public synchronized void performFailover(T currentProxy) {
+  public  void performFailover(T currentProxy) {
+    incrementProxyIndex();
+  }
+
+  synchronized void incrementProxyIndex() {
     currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
     currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
   }
   }
 
 

+ 186 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java

@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.io.retry.MultiException;
+
+/**
+ * A FailoverProxyProvider implementation that technically does not "failover"
+ * per-se. It constructs a wrapper proxy that sends the request to ALL
+ * underlying proxies simultaneously. It assumes the in an HA setup, there will
+ * be only one Active, and the active should respond faster than any configured
+ * standbys. Once it recieve a response from any one of the configred proxies,
+ * outstanding requests to other proxies are immediately cancelled.
+ */
+public class RequestHedgingProxyProvider<T> extends
+        ConfiguredFailoverProxyProvider<T> {
+
+  private static final Log LOG =
+          LogFactory.getLog(RequestHedgingProxyProvider.class);
+
+  class RequestHedgingInvocationHandler implements InvocationHandler {
+
+    final Map<String, ProxyInfo<T>> targetProxies;
+
+    public RequestHedgingInvocationHandler(
+            Map<String, ProxyInfo<T>> targetProxies) {
+      this.targetProxies = new HashMap<>(targetProxies);
+    }
+
+    /**
+     * Creates a Executor and invokes all proxies concurrently. This
+     * implementation assumes that Clients have configured proper socket
+     * timeouts, else the call can block forever.
+     *
+     * @param proxy
+     * @param method
+     * @param args
+     * @return
+     * @throws Throwable
+     */
+    @Override
+    public Object
+    invoke(Object proxy, final Method method, final Object[] args)
+            throws Throwable {
+      Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
+      int numAttempts = 0;
+
+      ExecutorService executor = null;
+      CompletionService<Object> completionService;
+      try {
+        // Optimization : if only 2 proxies are configured and one had failed
+        // over, then we dont need to create a threadpool etc.
+        targetProxies.remove(toIgnore);
+        if (targetProxies.size() == 1) {
+          ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next();
+          Object retVal = method.invoke(proxyInfo.proxy, args);
+          successfulProxy = proxyInfo;
+          return retVal;
+        }
+        executor = Executors.newFixedThreadPool(proxies.size());
+        completionService = new ExecutorCompletionService<>(executor);
+        for (final Map.Entry<String, ProxyInfo<T>> pEntry :
+                targetProxies.entrySet()) {
+          Callable<Object> c = new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+              return method.invoke(pEntry.getValue().proxy, args);
+            }
+          };
+          proxyMap.put(completionService.submit(c), pEntry.getValue());
+          numAttempts++;
+        }
+
+        Map<String, Exception> badResults = new HashMap<>();
+        while (numAttempts > 0) {
+          Future<Object> callResultFuture = completionService.take();
+          Object retVal;
+          try {
+            retVal = callResultFuture.get();
+            successfulProxy = proxyMap.get(callResultFuture);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Invocation successful on ["
+                      + successfulProxy.proxyInfo + "]");
+            }
+            return retVal;
+          } catch (Exception ex) {
+            ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
+            LOG.warn("Invocation returned exception on "
+                    + "[" + tProxyInfo.proxyInfo + "]");
+            badResults.put(tProxyInfo.proxyInfo, ex);
+            numAttempts--;
+          }
+        }
+
+        // At this point we should have All bad results (Exceptions)
+        // Or should have returned with successful result.
+        if (badResults.size() == 1) {
+          throw badResults.values().iterator().next();
+        } else {
+          throw new MultiException(badResults);
+        }
+      } finally {
+        if (executor != null) {
+          executor.shutdownNow();
+        }
+      }
+    }
+  }
+
+
+  private volatile ProxyInfo<T> successfulProxy = null;
+  private volatile String toIgnore = null;
+
+  public RequestHedgingProxyProvider(
+          Configuration conf, URI uri, Class<T> xface) {
+    this(conf, uri, xface, new DefaultProxyFactory<T>());
+  }
+
+  @VisibleForTesting
+  RequestHedgingProxyProvider(Configuration conf, URI uri,
+                              Class<T> xface, ProxyFactory<T> factory) {
+    super(conf, uri, xface, factory);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized ProxyInfo<T> getProxy() {
+    if (successfulProxy != null) {
+      return successfulProxy;
+    }
+    Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
+    StringBuilder combinedInfo = new StringBuilder('[');
+    for (int i = 0; i < proxies.size(); i++) {
+      ProxyInfo<T> pInfo = super.getProxy();
+      incrementProxyIndex();
+      targetProxyInfos.put(pInfo.proxyInfo, pInfo);
+      combinedInfo.append(pInfo.proxyInfo).append(',');
+    }
+    combinedInfo.append(']');
+    T wrappedProxy = (T) Proxy.newProxyInstance(
+            RequestHedgingInvocationHandler.class.getClassLoader(),
+            new Class<?>[]{xface},
+            new RequestHedgingInvocationHandler(targetProxyInfos));
+    return new ProxyInfo<T>(wrappedProxy, combinedInfo.toString());
+  }
+
+  @Override
+  public synchronized void performFailover(T currentProxy) {
+    toIgnore = successfulProxy.proxyInfo;
+    successfulProxy = null;
+  }
+
+}

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md

@@ -189,9 +189,12 @@ The order in which you set these configurations is unimportant, but the values y
 
 
     Configure the name of the Java class which will be used by the DFS Client to
     Configure the name of the Java class which will be used by the DFS Client to
     determine which NameNode is the current Active, and therefore which NameNode is
     determine which NameNode is the current Active, and therefore which NameNode is
-    currently serving client requests. The only implementation which currently
-    ships with Hadoop is the **ConfiguredFailoverProxyProvider**, so use this
-    unless you are using a custom one. For example:
+    currently serving client requests. The two implementations which currently
+    ship with Hadoop are the **ConfiguredFailoverProxyProvider** and the
+    **RequestHedgingProxyProvider** (which, for the first call, concurrently invokes all
+    namenodes to determine the active one, and on subsequent requests, invokes the active
+    namenode until a fail-over happens), so use one of these unless you are using a custom
+    proxy provider.
 
 
         <property>
         <property>
           <name>dfs.client.failover.proxy.provider.mycluster</name>
           <name>dfs.client.failover.proxy.provider.mycluster</name>

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md

@@ -208,9 +208,13 @@ The order in which you set these configurations is unimportant, but the values y
 
 
     Configure the name of the Java class which will be used by the DFS Client to
     Configure the name of the Java class which will be used by the DFS Client to
     determine which NameNode is the current Active, and therefore which NameNode is
     determine which NameNode is the current Active, and therefore which NameNode is
-    currently serving client requests. The only implementation which currently
-    ships with Hadoop is the **ConfiguredFailoverProxyProvider**, so use this
-    unless you are using a custom one. For example:
+    currently serving client requests. The two implementations which currently
+    ship with Hadoop are the **ConfiguredFailoverProxyProvider** and the
+    **RequestHedgingProxyProvider** (which, for the first call, concurrently invokes all
+    namenodes to determine the active one, and on subsequent requests, invokes the active
+    namenode until a fail-over happens), so use one of these unless you are using a custom
+    proxy provider.
+    For example:
 
 
         <property>
         <property>
           <name>dfs.client.failover.proxy.provider.mycluster</name>
           <name>dfs.client.failover.proxy.provider.mycluster</name>

+ 350 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java

@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.retry.MultiException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+
+public class TestRequestHedgingProxyProvider {
+
+  private Configuration conf;
+  private URI nnUri;
+  private String ns;
+
+  @Before
+  public void setup() throws URISyntaxException {
+    ns = "mycluster-" + Time.monotonicNow();
+    nnUri = new URI("hdfs://" + ns);
+    conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns);
+    conf.set(
+        DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2");
+    conf.set(
+        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1",
+        "machine1.foo.bar:8020");
+    conf.set(
+        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2",
+        "machine2.foo.bar:8020");
+  }
+
+  @Test
+  public void testHedgingWhenOneFails() throws Exception {
+    final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(goodMock.getStats()).thenReturn(new long[] {1});
+    final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+
+    RequestHedgingProxyProvider<NamenodeProtocols> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+            createFactory(goodMock, badMock));
+    long[] stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(goodMock).getStats();
+  }
+
+  @Test
+  public void testHedgingWhenOneIsSlow() throws Exception {
+    final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(1000);
+        return new long[]{1};
+      }
+    });
+    final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+
+    RequestHedgingProxyProvider<NamenodeProtocols> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+            createFactory(goodMock, badMock));
+    long[] stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(goodMock).getStats();
+  }
+
+  @Test
+  public void testHedgingWhenBothFail() throws Exception {
+    NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+    NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(worseMock.getStats()).thenThrow(
+            new IOException("Worse mock !!"));
+
+    RequestHedgingProxyProvider<NamenodeProtocols> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+            createFactory(badMock, worseMock));
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since both namenodes throw IOException !!");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof MultiException);
+    }
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(worseMock).getStats();
+  }
+
+  @Test
+  public void testPerformFailover() throws Exception {
+    final AtomicInteger counter = new AtomicInteger(0);
+    final int[] isGood = {1};
+    final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 1) {
+          Thread.sleep(1000);
+          return new long[]{1};
+        }
+        throw new IOException("Was Good mock !!");
+      }
+    });
+    final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 2) {
+          Thread.sleep(1000);
+          return new long[]{2};
+        }
+        throw new IOException("Bad mock !!");
+      }
+    });
+    RequestHedgingProxyProvider<NamenodeProtocols> provider =
+            new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+                    createFactory(goodMock, badMock));
+    long[] stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    Assert.assertEquals(2, counter.get());
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(goodMock).getStats();
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    // Ensure only the previous successful one is invoked
+    Mockito.verifyNoMoreInteractions(badMock);
+    Assert.assertEquals(3, counter.get());
+
+    // Flip to standby.. so now this should fail
+    isGood[0] = 2;
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since previously successful proxy now fails ");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IOException);
+    }
+
+    Assert.assertEquals(4, counter.get());
+
+    provider.performFailover(provider.getProxy().proxy);
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(2, stats[0]);
+
+    // Counter shuodl update only once
+    Assert.assertEquals(5, counter.get());
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(2, stats[0]);
+
+    // Counter updates only once now
+    Assert.assertEquals(6, counter.get());
+
+    // Flip back to old active.. so now this should fail
+    isGood[0] = 1;
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since previously successful proxy now fails ");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IOException);
+    }
+
+    Assert.assertEquals(7, counter.get());
+
+    provider.performFailover(provider.getProxy().proxy);
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    // Ensure correct proxy was called
+    Assert.assertEquals(1, stats[0]);
+  }
+
+  @Test
+  public void testPerformFailoverWith3Proxies() throws Exception {
+    conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
+            "nn1,nn2,nn3");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3",
+            "machine3.foo.bar:8020");
+
+    final AtomicInteger counter = new AtomicInteger(0);
+    final int[] isGood = {1};
+    final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 1) {
+          Thread.sleep(1000);
+          return new long[]{1};
+        }
+        throw new IOException("Was Good mock !!");
+      }
+    });
+    final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 2) {
+          Thread.sleep(1000);
+          return new long[]{2};
+        }
+        throw new IOException("Bad mock !!");
+      }
+    });
+    final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
+    Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 3) {
+          Thread.sleep(1000);
+          return new long[]{3};
+        }
+        throw new IOException("Worse mock !!");
+      }
+    });
+
+    RequestHedgingProxyProvider<NamenodeProtocols> provider =
+            new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+                    createFactory(goodMock, badMock, worseMock));
+    long[] stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    Assert.assertEquals(3, counter.get());
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(goodMock).getStats();
+    Mockito.verify(worseMock).getStats();
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    // Ensure only the previous successful one is invoked
+    Mockito.verifyNoMoreInteractions(badMock);
+    Mockito.verifyNoMoreInteractions(worseMock);
+    Assert.assertEquals(4, counter.get());
+
+    // Flip to standby.. so now this should fail
+    isGood[0] = 2;
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since previously successful proxy now fails ");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IOException);
+    }
+
+    Assert.assertEquals(5, counter.get());
+
+    provider.performFailover(provider.getProxy().proxy);
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(2, stats[0]);
+
+    // Counter updates twice since both proxies are tried on failure
+    Assert.assertEquals(7, counter.get());
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(2, stats[0]);
+
+    // Counter updates only once now
+    Assert.assertEquals(8, counter.get());
+
+    // Flip to Other standby.. so now this should fail
+    isGood[0] = 3;
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since previously successful proxy now fails ");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IOException);
+    }
+
+    // Counter should ipdate only 1 time
+    Assert.assertEquals(9, counter.get());
+
+    provider.performFailover(provider.getProxy().proxy);
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+
+    // Ensure correct proxy was called
+    Assert.assertEquals(3, stats[0]);
+
+    // Counter updates twice since both proxies are tried on failure
+    Assert.assertEquals(11, counter.get());
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(3, stats[0]);
+
+    // Counter updates only once now
+    Assert.assertEquals(12, counter.get());
+  }
+
+  private ProxyFactory<NamenodeProtocols> createFactory(
+      NamenodeProtocols... protos) {
+    final Iterator<NamenodeProtocols> iterator =
+        Lists.newArrayList(protos).iterator();
+    return new ProxyFactory<NamenodeProtocols>() {
+      @Override
+      public NamenodeProtocols createProxy(Configuration conf,
+          InetSocketAddress nnAddr, Class<NamenodeProtocols> xface,
+          UserGroupInformation ugi, boolean withRetries,
+          AtomicBoolean fallbackToSimpleAuth) throws IOException {
+        return iterator.next();
+      }
+    };
+  }
+}