瀏覽代碼

HADOOP-7380. Add client failover functionality to o.a.h.io.(ipc|retry). Contributed by Aaron T. Myers

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1144043 13f79535-47bb-0310-9956-ffa450edef68
Eli Collins 14 年之前
父節點
當前提交
637cdaefc2

+ 3 - 0
common/CHANGES.txt

@@ -53,6 +53,9 @@ Trunk (unreleased changes)
     HADOOP-7329. Add the capability of getting invividual attribute of a mbean
     using JMXProxyServlet. (tanping)
 
+    HADOOP-7380. Add client failover functionality to o.a.h.io.(ipc|retry).
+    (atm via eli)
+
   IMPROVEMENTS
 
     HADOOP-7042. Updates to test-patch.sh to include failed test names and

+ 52 - 0
common/src/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An implementation of {@link FailoverProxyProvider} which does nothing in the
+ * event of failover, and always returns the same proxy object. 
+ */
+@InterfaceStability.Evolving
+public class DefaultFailoverProxyProvider implements FailoverProxyProvider {
+  
+  private Object proxy;
+  private Class<?> iface;
+  
+  public DefaultFailoverProxyProvider(Class<?> iface, Object proxy) {
+    this.proxy = proxy;
+    this.iface = iface;
+  }
+
+  @Override
+  public Class<?> getInterface() {
+    return iface;
+  }
+
+  @Override
+  public Object getProxy() {
+    return proxy;
+  }
+
+  @Override
+  public void performFailover(Object currentProxy) {
+    // Nothing to do.
+  }
+
+}

+ 60 - 0
common/src/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java

@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An implementer of this interface is capable of providing proxy objects for
+ * use in IPC communication, and potentially modifying these objects or creating
+ * entirely new ones in the event of certain types of failures. The
+ * determination of whether or not to fail over is handled by
+ * {@link RetryPolicy}.
+ */
+@InterfaceStability.Evolving
+public interface FailoverProxyProvider {
+
+  /**
+   * Get the proxy object which should be used until the next failover event
+   * occurs.
+   * 
+   * @return the proxy object to invoke methods upon
+   */
+  public Object getProxy();
+
+  /**
+   * Called whenever the associated {@link RetryPolicy} determines that an error
+   * warrants failing over.
+   * 
+   * @param currentProxy the proxy object which was being used before this
+   *        failover event
+   */
+  public void performFailover(Object currentProxy);
+
+  /**
+   * Return a reference to the interface this provider's proxy objects actually
+   * implement. If any of the methods on this interface are annotated as being
+   * {@link Idempotent}, then this fact will be passed to the
+   * {@link RetryPolicy#shouldRetry(Exception, int, int, boolean)} method on
+   * error, for use in determining whether or not failover should be attempted.
+   * 
+   * @return the interface implemented by the proxy objects returned by
+   *         {@link FailoverProxyProvider#getProxy()}
+   */
+  public Class<?> getInterface();
+}

+ 35 - 0
common/src/java/org/apache/hadoop/io/retry/Idempotent.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Used to mark certain methods of an interface as being idempotent, and
+ * therefore warrant being retried on failover.
+ */
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+@InterfaceStability.Evolving
+public @interface Idempotent {}

+ 29 - 13
common/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

@@ -25,25 +25,30 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 
 class RetryInvocationHandler implements InvocationHandler {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
-  private Object implementation;
+  private FailoverProxyProvider proxyProvider;
   
   private RetryPolicy defaultPolicy;
   private Map<String,RetryPolicy> methodNameToPolicyMap;
+  private Object currentProxy;
   
-  public RetryInvocationHandler(Object implementation, RetryPolicy retryPolicy) {
-    this.implementation = implementation;
+  public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+      RetryPolicy retryPolicy) {
+    this.proxyProvider = proxyProvider;
     this.defaultPolicy = retryPolicy;
     this.methodNameToPolicyMap = Collections.emptyMap();
+    this.currentProxy = proxyProvider.getProxy();
   }
   
-  public RetryInvocationHandler(Object implementation, Map<String, RetryPolicy> methodNameToPolicyMap) {
-    this.implementation = implementation;
+  public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+      Map<String, RetryPolicy> methodNameToPolicyMap) {
+    this.proxyProvider = proxyProvider;
     this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
     this.methodNameToPolicyMap = methodNameToPolicyMap;
+    this.currentProxy = proxyProvider.getProxy();
   }
 
   public Object invoke(Object proxy, Method method, Object[] args)
@@ -53,24 +58,35 @@ class RetryInvocationHandler implements InvocationHandler {
       policy = defaultPolicy;
     }
     
+    int failovers = 0;
     int retries = 0;
     while (true) {
       try {
         return invokeMethod(method, args);
       } catch (Exception e) {
-        if (!policy.shouldRetry(e, retries++)) {
-          LOG.info("Exception while invoking " + method.getName()
-                   + " of " + implementation.getClass() + ". Not retrying."
-                   , e);
+        boolean isMethodIdempotent = proxyProvider.getInterface()
+            .getMethod(method.getName(), method.getParameterTypes())
+            .isAnnotationPresent(Idempotent.class);
+        RetryAction action = policy.shouldRetry(e, retries++, failovers,
+            isMethodIdempotent);
+        if (action == RetryAction.FAIL) {
+          LOG.warn("Exception while invoking " + method.getName()
+                   + " of " + currentProxy.getClass() + ". Not retrying.", e);
           if (!method.getReturnType().equals(Void.TYPE)) {
             throw e; // non-void methods can't fail without an exception
           }
           return null;
+        } else if (action == RetryAction.FAILOVER_AND_RETRY) {
+          LOG.warn("Exception while invoking " + method.getName()
+              + " of " + currentProxy.getClass()
+              + ". Trying to fail over.", e);
+          failovers++;
+          proxyProvider.performFailover(currentProxy);
+          currentProxy = proxyProvider.getProxy();
         }
         if(LOG.isDebugEnabled()) {
           LOG.debug("Exception while invoking " + method.getName()
-              + " of " + implementation.getClass() + ". Retrying."
-              , e);
+              + " of " + currentProxy.getClass() + ". Retrying.", e);
         }
       }
     }
@@ -81,7 +97,7 @@ class RetryInvocationHandler implements InvocationHandler {
       if (!method.isAccessible()) {
         method.setAccessible(true);
       }
-      return method.invoke(implementation, args);
+      return method.invoke(currentProxy, args);
     } catch (InvocationTargetException e) {
       throw e.getCause();
     }

+ 87 - 12
common/src/java/org/apache/hadoop/io/retry/RetryPolicies.java

@@ -17,14 +17,21 @@
  */
 package org.apache.hadoop.io.retry;
 
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
 
 /**
  * <p>
@@ -33,6 +40,8 @@ import org.apache.hadoop.ipc.RemoteException;
  */
 public class RetryPolicies {
   
+  public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
+  
   /**
    * <p>
    * Try once, and fail by re-throwing the exception.
@@ -122,20 +131,32 @@ public class RetryPolicies {
     return new RemoteExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap);
   }
   
+  public static final RetryPolicy failoverOnNetworkException(int maxFailovers) {
+    return failoverOnNetworkException(TRY_ONCE_THEN_FAIL, maxFailovers);
+  }
+  
+  public static final RetryPolicy failoverOnNetworkException(
+      RetryPolicy fallbackPolicy, int maxFailovers) {
+    return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers);
+  }
+  
   static class TryOnceThenFail implements RetryPolicy {
-    public boolean shouldRetry(Exception e, int retries) throws Exception {
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isMethodIdempotent) throws Exception {
       throw e;
     }
   }
   static class TryOnceDontFail implements RetryPolicy {
-    public boolean shouldRetry(Exception e, int retries) throws Exception {
-      return false;
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isMethodIdempotent) throws Exception {
+      return RetryAction.FAIL;
     }
   }
   
   static class RetryForever implements RetryPolicy {
-    public boolean shouldRetry(Exception e, int retries) throws Exception {
-      return true;
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isMethodIdempotent) throws Exception {
+      return RetryAction.RETRY;
     }
   }
   
@@ -150,7 +171,8 @@ public class RetryPolicies {
       this.timeUnit = timeUnit;
     }
 
-    public boolean shouldRetry(Exception e, int retries) throws Exception {
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isMethodIdempotent) throws Exception {
       if (retries >= maxRetries) {
         throw e;
       }
@@ -159,7 +181,7 @@ public class RetryPolicies {
       } catch (InterruptedException ie) {
         // retry
       }
-      return true;
+      return RetryAction.RETRY;
     }
     
     protected abstract long calculateSleepTime(int retries);
@@ -204,12 +226,13 @@ public class RetryPolicies {
       this.exceptionToPolicyMap = exceptionToPolicyMap;
     }
 
-    public boolean shouldRetry(Exception e, int retries) throws Exception {
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isMethodIdempotent) throws Exception {
       RetryPolicy policy = exceptionToPolicyMap.get(e.getClass());
       if (policy == null) {
         policy = defaultPolicy;
       }
-      return policy.shouldRetry(e, retries);
+      return policy.shouldRetry(e, retries, failovers, isMethodIdempotent);
     }
     
   }
@@ -230,7 +253,8 @@ public class RetryPolicies {
       }
     }
 
-    public boolean shouldRetry(Exception e, int retries) throws Exception {
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isMethodIdempotent) throws Exception {
       RetryPolicy policy = null;
       if (e instanceof RemoteException) {
         policy = exceptionNameToPolicyMap.get(
@@ -239,7 +263,7 @@ public class RetryPolicies {
       if (policy == null) {
         policy = defaultPolicy;
       }
-      return policy.shouldRetry(e, retries);
+      return policy.shouldRetry(e, retries, failovers, isMethodIdempotent);
     }
   }
   
@@ -255,4 +279,55 @@ public class RetryPolicies {
       return sleepTime*r.nextInt(1<<(retries+1));
     }
   }
+  
+  /*
+   * Fail over and retry in the case of:
+   *   Remote StandbyException (server is up, but is not the active server)
+   *   Immediate socket exceptions (e.g. no route to host, econnrefused)
+   *   Socket exceptions after initial connection when operation is idempotent
+   * 
+   * Fail immediately in the case of:
+   *   Socket exceptions after initial connection when operation is not idempotent
+   * 
+   * Fall back on underlying retry policy otherwise.
+   */
+  static class FailoverOnNetworkExceptionRetry implements RetryPolicy {
+    
+    private RetryPolicy fallbackPolicy;
+    private int maxFailovers;
+    
+    public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
+        int maxFailovers) {
+      this.fallbackPolicy = fallbackPolicy;
+      this.maxFailovers = maxFailovers;
+    }
+
+    @Override
+    public RetryAction shouldRetry(Exception e, int retries,
+        int failovers, boolean isMethodIdempotent) throws Exception {
+      if (failovers >= maxFailovers) {
+        LOG.info("Failovers (" + failovers + ") exceeded maximum allowed ("
+            + maxFailovers + ")");
+        return RetryAction.FAIL;
+      }
+      
+      if (e instanceof ConnectException ||
+          e instanceof NoRouteToHostException ||
+          e instanceof UnknownHostException ||
+          e instanceof StandbyException) {
+        return RetryAction.FAILOVER_AND_RETRY;
+      } else if (e instanceof SocketException ||
+                 e instanceof IOException) {
+        if (isMethodIdempotent) {
+          return RetryAction.FAILOVER_AND_RETRY;
+        } else {
+          return RetryAction.FAIL;
+        }
+      } else {
+        return fallbackPolicy.shouldRetry(e, retries, failovers,
+            isMethodIdempotent);
+      }
+    }
+    
+  }
 }

+ 26 - 5
common/src/java/org/apache/hadoop/io/retry/RetryPolicy.java

@@ -17,13 +17,28 @@
  */
 package org.apache.hadoop.io.retry;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
+
 /**
  * <p>
  * Specifies a policy for retrying method failures.
  * Implementations of this interface should be immutable.
  * </p>
  */
+@InterfaceStability.Evolving
 public interface RetryPolicy {
+  
+  /**
+   * Returned by {@link RetryPolicy#shouldRetry(Exception, int, int, boolean)}.
+   */
+  @InterfaceStability.Evolving
+  public enum RetryAction {
+    FAIL,
+    RETRY,
+    FAILOVER_AND_RETRY
+  }
+  
   /**
    * <p>
    * Determines whether the framework should retry a
@@ -31,13 +46,19 @@ public interface RetryPolicy {
    * of retries that have been made for that operation
    * so far.
    * </p>
-   * @param e The exception that caused the method to fail.
-   * @param retries The number of times the method has been retried.
+   * @param e The exception that caused the method to fail
+   * @param retries The number of times the method has been retried
+   * @param failovers The number of times the method has failed over to a
+   *   different backend implementation
+   * @param isMethodIdempotent <code>true</code> if the method is idempotent
+   *   and so can reasonably be retried on failover when we don't know if the
+   *   previous attempt reached the server or not
    * @return <code>true</code> if the method should be retried,
    *   <code>false</code> if the method should not be retried
-   *   but shouldn't fail with an exception (only for void methods).
+   *   but shouldn't fail with an exception (only for void methods)
    * @throws Exception The re-thrown exception <code>e</code> indicating
-   *   that the method failed and should not be retried further. 
+   *   that the method failed and should not be retried further
    */
-  public boolean shouldRetry(Exception e, int retries) throws Exception;
+  public RetryAction shouldRetry(Exception e, int retries, int failovers,
+      boolean isMethodIdempotent) throws Exception;
 }

+ 46 - 12
common/src/java/org/apache/hadoop/io/retry/RetryProxy.java

@@ -33,25 +33,41 @@ public class RetryProxy {
    * </p>
    * @param iface the interface that the retry will implement
    * @param implementation the instance whose methods should be retried
-   * @param retryPolicy the policy for retirying method call failures
+   * @param retryPolicy the policy for retrying method call failures
    * @return the retry proxy
    */
   public static Object create(Class<?> iface, Object implementation,
                               RetryPolicy retryPolicy) {
+    return RetryProxy.create(iface,
+        new DefaultFailoverProxyProvider(iface, implementation),
+        retryPolicy);
+  }
+
+  /**
+   * Create a proxy for an interface of implementations of that interface using
+   * the given {@link FailoverProxyProvider} and the same retry policy for each
+   * method in the interface.
+   * 
+   * @param iface the interface that the retry will implement
+   * @param proxyProvider provides implementation instances whose methods should be retried
+   * @param retryPolicy the policy for retrying or failing over method call failures
+   * @return the retry proxy
+   */
+  public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
+      RetryPolicy retryPolicy) {
     return Proxy.newProxyInstance(
-                                  implementation.getClass().getClassLoader(),
-                                  new Class<?>[] { iface },
-                                  new RetryInvocationHandler(implementation, retryPolicy)
-                                  );
-  }  
+        proxyProvider.getInterface().getClassLoader(),
+        new Class<?>[] { iface },
+        new RetryInvocationHandler(proxyProvider, retryPolicy)
+        );
+  }
   
   /**
-   * <p>
    * Create a proxy for an interface of an implementation class
    * using the a set of retry policies specified by method name.
    * If no retry policy is defined for a method then a default of
    * {@link RetryPolicies#TRY_ONCE_THEN_FAIL} is used.
-   * </p>
+   * 
    * @param iface the interface that the retry will implement
    * @param implementation the instance whose methods should be retried
    * @param methodNameToPolicyMap a map of method names to retry policies
@@ -59,10 +75,28 @@ public class RetryProxy {
    */
   public static Object create(Class<?> iface, Object implementation,
                               Map<String,RetryPolicy> methodNameToPolicyMap) {
+    return RetryProxy.create(iface,
+        new DefaultFailoverProxyProvider(iface, implementation),
+        methodNameToPolicyMap);
+  }
+
+  /**
+   * Create a proxy for an interface of implementations of that interface using
+   * the given {@link FailoverProxyProvider} and the a set of retry policies
+   * specified by method name. If no retry policy is defined for a method then a
+   * default of {@link RetryPolicies#TRY_ONCE_THEN_FAIL} is used.
+   * 
+   * @param iface the interface that the retry will implement
+   * @param proxyProvider provides implementation instances whose methods should be retried
+   * @param methodNameToPolicyMapa map of method names to retry policies
+   * @return the retry proxy
+   */
+  public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
+      Map<String,RetryPolicy> methodNameToPolicyMap) {
     return Proxy.newProxyInstance(
-                                  implementation.getClass().getClassLoader(),
-                                  new Class<?>[] { iface },
-                                  new RetryInvocationHandler(implementation, methodNameToPolicyMap)
-                                  );
+        proxyProvider.getInterface().getClassLoader(),
+        new Class<?>[] { iface },
+        new RetryInvocationHandler(proxyProvider, methodNameToPolicyMap)
+        );
   }
 }

+ 32 - 0
common/src/java/org/apache/hadoop/ipc/StandbyException.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Thrown by a remote server when it is up, but is not the active server in a
+ * set of servers in which only a subset may be active.
+ */
+@InterfaceStability.Evolving
+public class StandbyException extends Exception {
+  static final long serialVersionUID = 0x12308AD010L;
+  public StandbyException(String msg) {
+    super(msg);
+  }
+}

+ 184 - 0
common/src/test/core/org/apache/hadoop/io/retry/TestFailoverProxy.java

@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith;
+import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.junit.Test;
+
+public class TestFailoverProxy {
+
+  public static class FlipFlopProxyProvider implements FailoverProxyProvider {
+    
+    private Class<?> iface;
+    private Object currentlyActive;
+    private Object impl1;
+    private Object impl2;
+    
+    public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
+        Object standbyImpl) {
+      this.iface = iface;
+      this.impl1 = activeImpl;
+      this.impl2 = standbyImpl;
+      currentlyActive = impl1;
+    }
+    
+    @Override
+    public Object getProxy() {
+      return currentlyActive;
+    }
+
+    @Override
+    public void performFailover(Object currentProxy) {
+      currentlyActive = impl1 == currentProxy ? impl2 : impl1;
+    }
+
+    @Override
+    public Class<?> getInterface() {
+      return iface;
+    }
+    
+  }
+  
+  public static class FailOverOnceOnAnyExceptionPolicy implements RetryPolicy {
+
+    @Override
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isMethodIdempotent) {
+      return failovers < 1 ? RetryAction.FAILOVER_AND_RETRY : RetryAction.FAIL;
+    }
+    
+  }
+  
+  @Test
+  public void testSuccedsOnceThenFailOver() throws UnreliableException,
+      IOException, StandbyException {
+    UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+        .create(UnreliableInterface.class,
+            new FlipFlopProxyProvider(UnreliableInterface.class,
+              new UnreliableImplementation("impl1"),
+              new UnreliableImplementation("impl2")),
+            new FailOverOnceOnAnyExceptionPolicy());
+    
+    assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
+    assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
+    try {
+      unreliable.succeedsOnceThenFailsReturningString();
+      fail("should not have succeeded more than twice");
+    } catch (UnreliableException e) {
+      // expected
+    }
+  }
+  
+  @Test
+  public void testSucceedsTenTimesThenFailOver() throws UnreliableException,
+      IOException, StandbyException {
+    UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+        .create(UnreliableInterface.class,
+            new FlipFlopProxyProvider(UnreliableInterface.class,
+              new UnreliableImplementation("impl1"),
+              new UnreliableImplementation("impl2")),
+            new FailOverOnceOnAnyExceptionPolicy());
+    
+    for (int i = 0; i < 10; i++) {
+      assertEquals("impl1", unreliable.succeedsTenTimesThenFailsReturningString());
+    }
+    assertEquals("impl2", unreliable.succeedsTenTimesThenFailsReturningString());
+  }
+  
+  @Test
+  public void testNeverFailOver() throws UnreliableException,
+      IOException, StandbyException {
+    UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+    .create(UnreliableInterface.class,
+        new FlipFlopProxyProvider(UnreliableInterface.class,
+          new UnreliableImplementation("impl1"),
+          new UnreliableImplementation("impl2")),
+        RetryPolicies.TRY_ONCE_DONT_FAIL);
+
+    unreliable.succeedsOnceThenFailsReturningString();
+    try {
+      unreliable.succeedsOnceThenFailsReturningString();
+      fail("should not have succeeded twice");
+    } catch (UnreliableException e) {
+      assertEquals("impl1", e.getMessage());
+    }
+  }
+  
+  @Test
+  public void testFailoverOnStandbyException()
+      throws UnreliableException, IOException, StandbyException {
+    UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+    .create(UnreliableInterface.class,
+        new FlipFlopProxyProvider(UnreliableInterface.class,
+          new UnreliableImplementation("impl1"),
+          new UnreliableImplementation("impl2")),
+        RetryPolicies.failoverOnNetworkException(1));
+    
+    assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
+    try {
+      unreliable.succeedsOnceThenFailsReturningString();
+      fail("should not have succeeded twice");
+    } catch (UnreliableException e) {
+      // Make sure there was no failover on normal exception.
+      assertEquals("impl1", e.getMessage());
+    }
+    
+    unreliable = (UnreliableInterface)RetryProxy
+    .create(UnreliableInterface.class,
+        new FlipFlopProxyProvider(UnreliableInterface.class,
+          new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
+          new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
+        RetryPolicies.failoverOnNetworkException(1));
+    
+    assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
+    // Make sure we fail over since the first implementation threw a StandbyException
+    assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
+  }
+  
+  @Test
+  public void testFailoverOnNetworkExceptionIdempotentOperation()
+      throws UnreliableException, IOException, StandbyException {
+    UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+    .create(UnreliableInterface.class,
+        new FlipFlopProxyProvider(UnreliableInterface.class,
+          new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.IO_EXCEPTION),
+          new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
+        RetryPolicies.failoverOnNetworkException(1));
+    
+    assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
+    try {
+      assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
+      fail("should not have succeeded twice");
+    } catch (IOException e) {
+      // Make sure we *don't* fail over since the first implementation threw an
+      // IOException and this method is not idempotent
+      assertEquals("impl1", e.getMessage());
+    }
+    
+    assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
+    // Make sure we fail over since the first implementation threw an
+    // IOException and this method is idempotent.
+    assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
+  }
+}

+ 86 - 2
common/src/test/core/org/apache/hadoop/io/retry/UnreliableImplementation.java

@@ -15,16 +15,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.io.retry;
 
+import java.io.IOException;
+
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
 
 public class UnreliableImplementation implements UnreliableInterface {
 
   private int failsOnceInvocationCount,
     failsOnceWithValueInvocationCount,
-    failsTenTimesInvocationCount;
+    failsTenTimesInvocationCount,
+    succeedsOnceThenFailsCount,
+    succeedsOnceThenFailsIdempotentCount,
+    succeedsTenTimesThenFailsCount;
+  
+  private String identifier;
+  private TypeOfExceptionToFailWith exceptionToFailWith;
+  
+  public static enum TypeOfExceptionToFailWith {
+    UNRELIABLE_EXCEPTION,
+    STANDBY_EXCEPTION,
+    IO_EXCEPTION
+  }
+  
+  public UnreliableImplementation() {
+    this(null);
+  }
+  
+  public UnreliableImplementation(String identifier) {
+    this(identifier, TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION);
+  }
+  
+  public UnreliableImplementation(String identifier,
+      TypeOfExceptionToFailWith exceptionToFailWith) {
+    this.identifier = identifier;
+    this.exceptionToFailWith = exceptionToFailWith;
+  }
   
   public void alwaysSucceeds() {
     // do nothing
@@ -57,4 +85,60 @@ public class UnreliableImplementation implements UnreliableInterface {
     }
   }
 
+  @Override
+  public String succeedsOnceThenFailsReturningString()
+      throws UnreliableException, IOException, StandbyException {
+    if (succeedsOnceThenFailsCount++ < 1) {
+      return identifier;
+    } else {
+      switch (exceptionToFailWith) {
+      case STANDBY_EXCEPTION:
+        throw new StandbyException(identifier);
+      case UNRELIABLE_EXCEPTION:
+        throw new UnreliableException(identifier);
+      case IO_EXCEPTION:
+        throw new IOException(identifier);
+      }
+      return null;
+    }
+  }
+
+  @Override
+  public String succeedsTenTimesThenFailsReturningString()
+      throws UnreliableException, IOException, StandbyException {
+    if (succeedsTenTimesThenFailsCount++ < 10) {
+      return identifier;
+    } else {
+      switch (exceptionToFailWith) {
+      case STANDBY_EXCEPTION:
+        throw new StandbyException(identifier);
+      case UNRELIABLE_EXCEPTION:
+        throw new UnreliableException(identifier);
+      case IO_EXCEPTION:
+        throw new IOException(identifier);
+      default:
+        throw new RuntimeException(identifier);
+      }
+    }
+  }
+
+  @Override
+  public String succeedsOnceThenFailsReturningStringIdempotent()
+      throws UnreliableException, StandbyException, IOException {
+    if (succeedsOnceThenFailsIdempotentCount++ < 1) {
+      return identifier;
+    } else {
+      switch (exceptionToFailWith) {
+      case STANDBY_EXCEPTION:
+        throw new StandbyException(identifier);
+      case UNRELIABLE_EXCEPTION:
+        throw new UnreliableException(identifier);
+      case IO_EXCEPTION:
+        throw new IOException(identifier);
+      default:
+        throw new RuntimeException(identifier);
+      }
+    }
+  }
+
 }

+ 25 - 1
common/src/test/core/org/apache/hadoop/io/retry/UnreliableInterface.java

@@ -18,12 +18,28 @@
 
 package org.apache.hadoop.io.retry;
 
+import java.io.IOException;
+
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
 
 public interface UnreliableInterface {
   
   public static class UnreliableException extends Exception {
-    // no body
+    private String identifier;
+    
+    public UnreliableException() {
+      // no body
+    }
+    
+    public UnreliableException(String identifier) {
+      this.identifier = identifier;
+    }
+    
+    @Override
+    public String getMessage() {
+      return identifier;
+    }
   }
   
   public static class FatalException extends UnreliableException {
@@ -39,4 +55,12 @@ public interface UnreliableInterface {
   boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException;
 
   void failsTenTimesThenSucceeds() throws UnreliableException;
+  
+  public String succeedsOnceThenFailsReturningString()
+      throws UnreliableException, StandbyException, IOException;
+  @Idempotent
+  public String succeedsOnceThenFailsReturningStringIdempotent()
+      throws UnreliableException, StandbyException, IOException;
+  public String succeedsTenTimesThenFailsReturningString()
+      throws UnreliableException, StandbyException, IOException;
 }