|
@@ -23,6 +23,7 @@ import java.lang.reflect.Method;
|
|
import java.lang.reflect.Proxy;
|
|
import java.lang.reflect.Proxy;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -33,6 +34,11 @@ import org.apache.hadoop.ipc.RpcInvocationHandler;
|
|
class RetryInvocationHandler implements RpcInvocationHandler {
|
|
class RetryInvocationHandler implements RpcInvocationHandler {
|
|
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
|
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
|
private FailoverProxyProvider proxyProvider;
|
|
private FailoverProxyProvider proxyProvider;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The number of times the associated proxyProvider has ever been failed over.
|
|
|
|
+ */
|
|
|
|
+ private long proxyProviderFailoverCount = 0;
|
|
|
|
|
|
private RetryPolicy defaultPolicy;
|
|
private RetryPolicy defaultPolicy;
|
|
private Map<String,RetryPolicy> methodNameToPolicyMap;
|
|
private Map<String,RetryPolicy> methodNameToPolicyMap;
|
|
@@ -61,16 +67,24 @@ class RetryInvocationHandler implements RpcInvocationHandler {
|
|
policy = defaultPolicy;
|
|
policy = defaultPolicy;
|
|
}
|
|
}
|
|
|
|
|
|
- int failovers = 0;
|
|
|
|
|
|
+ // The number of times this method invocation has been failed over.
|
|
|
|
+ int invocationFailoverCount = 0;
|
|
int retries = 0;
|
|
int retries = 0;
|
|
while (true) {
|
|
while (true) {
|
|
|
|
+ // The number of times this invocation handler has ever been failed over,
|
|
|
|
+ // before this method invocation attempt. Used to prevent concurrent
|
|
|
|
+ // failed method invocations from triggering multiple failover attempts.
|
|
|
|
+ long invocationAttemptFailoverCount;
|
|
|
|
+ synchronized (proxyProvider) {
|
|
|
|
+ invocationAttemptFailoverCount = proxyProviderFailoverCount;
|
|
|
|
+ }
|
|
try {
|
|
try {
|
|
return invokeMethod(method, args);
|
|
return invokeMethod(method, args);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
boolean isMethodIdempotent = proxyProvider.getInterface()
|
|
boolean isMethodIdempotent = proxyProvider.getInterface()
|
|
.getMethod(method.getName(), method.getParameterTypes())
|
|
.getMethod(method.getName(), method.getParameterTypes())
|
|
.isAnnotationPresent(Idempotent.class);
|
|
.isAnnotationPresent(Idempotent.class);
|
|
- RetryAction action = policy.shouldRetry(e, retries++, failovers,
|
|
|
|
|
|
+ RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
|
|
isMethodIdempotent);
|
|
isMethodIdempotent);
|
|
if (action == RetryAction.FAIL) {
|
|
if (action == RetryAction.FAIL) {
|
|
LOG.warn("Exception while invoking " + method.getName()
|
|
LOG.warn("Exception while invoking " + method.getName()
|
|
@@ -82,10 +96,24 @@ class RetryInvocationHandler implements RpcInvocationHandler {
|
|
} else if (action == RetryAction.FAILOVER_AND_RETRY) {
|
|
} else if (action == RetryAction.FAILOVER_AND_RETRY) {
|
|
LOG.warn("Exception while invoking " + method.getName()
|
|
LOG.warn("Exception while invoking " + method.getName()
|
|
+ " of " + currentProxy.getClass()
|
|
+ " of " + currentProxy.getClass()
|
|
- + ". Trying to fail over.", e);
|
|
|
|
- failovers++;
|
|
|
|
- proxyProvider.performFailover(currentProxy);
|
|
|
|
|
|
+ + " after " + invocationFailoverCount + " fail over attempts."
|
|
|
|
+ + " Trying to fail over.", e);
|
|
|
|
+ // Make sure that concurrent failed method invocations only cause a
|
|
|
|
+ // single actual fail over.
|
|
|
|
+ synchronized (proxyProvider) {
|
|
|
|
+ if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
|
|
|
|
+ proxyProvider.performFailover(currentProxy);
|
|
|
|
+ proxyProviderFailoverCount++;
|
|
|
|
+ } else {
|
|
|
|
+ LOG.warn("A failover has occurred since the start of this method"
|
|
|
|
+ + " invocation attempt.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // The call to getProxy() could technically only be made in the event
|
|
|
|
+ // performFailover() is called, but it needs to be out here for the
|
|
|
|
+ // purpose of testing.
|
|
currentProxy = proxyProvider.getProxy();
|
|
currentProxy = proxyProvider.getProxy();
|
|
|
|
+ invocationFailoverCount++;
|
|
}
|
|
}
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
LOG.debug("Exception while invoking " + method.getName()
|
|
LOG.debug("Exception while invoking " + method.getName()
|