|
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
|
|
import org.apache.hadoop.ipc.Client;
|
|
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
|
@@ -57,7 +58,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
|
|
|
private final RetryPolicy defaultPolicy;
|
|
|
private final Map<String,RetryPolicy> methodNameToPolicyMap;
|
|
|
- private T currentProxy;
|
|
|
+ private ProxyInfo<T> currentProxy;
|
|
|
|
|
|
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
|
|
|
RetryPolicy retryPolicy) {
|
|
@@ -83,7 +84,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
|
|
|
// The number of times this method invocation has been failed over.
|
|
|
int invocationFailoverCount = 0;
|
|
|
- final boolean isRpc = isRpcInvocation(currentProxy);
|
|
|
+ final boolean isRpc = isRpcInvocation(currentProxy.proxy);
|
|
|
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
|
|
|
int retries = 0;
|
|
|
while (true) {
|
|
@@ -115,9 +116,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
invocationFailoverCount, isIdempotentOrAtMostOnce);
|
|
|
if (action.action == RetryAction.RetryDecision.FAIL) {
|
|
|
if (action.reason != null) {
|
|
|
- LOG.warn("Exception while invoking " +
|
|
|
- currentProxy.getClass() + "." + method.getName() +
|
|
|
- ". Not retrying because " + action.reason, e);
|
|
|
+ LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
|
|
|
+ + "." + method.getName() + " over " + currentProxy.proxyInfo
|
|
|
+ + ". Not retrying because " + action.reason, e);
|
|
|
}
|
|
|
throw e;
|
|
|
} else { // retry or failover
|
|
@@ -130,7 +131,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
|
|
|
worthLogging) {
|
|
|
String msg = "Exception while invoking " + method.getName()
|
|
|
- + " of class " + currentProxy.getClass().getSimpleName();
|
|
|
+ + " of class " + currentProxy.proxy.getClass().getSimpleName()
|
|
|
+ + " over " + currentProxy.proxyInfo;
|
|
|
+
|
|
|
if (invocationFailoverCount > 0) {
|
|
|
msg += " after " + invocationFailoverCount + " fail over attempts";
|
|
|
}
|
|
@@ -141,8 +144,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
} else {
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Exception while invoking " + method.getName()
|
|
|
- + " of class " + currentProxy.getClass().getSimpleName() +
|
|
|
- ". Retrying " + formatSleepMessage(action.delayMillis), e);
|
|
|
+ + " of class " + currentProxy.proxy.getClass().getSimpleName()
|
|
|
+ + " over " + currentProxy.proxyInfo + ". Retrying "
|
|
|
+ + formatSleepMessage(action.delayMillis), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -155,7 +159,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
// single actual fail over.
|
|
|
synchronized (proxyProvider) {
|
|
|
if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
|
|
|
- proxyProvider.performFailover(currentProxy);
|
|
|
+ proxyProvider.performFailover(currentProxy.proxy);
|
|
|
proxyProviderFailoverCount++;
|
|
|
currentProxy = proxyProvider.getProxy();
|
|
|
} else {
|
|
@@ -183,7 +187,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
if (!method.isAccessible()) {
|
|
|
method.setAccessible(true);
|
|
|
}
|
|
|
- return method.invoke(currentProxy, args);
|
|
|
+ return method.invoke(currentProxy.proxy, args);
|
|
|
} catch (InvocationTargetException e) {
|
|
|
throw e.getCause();
|
|
|
}
|
|
@@ -208,7 +212,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
|
|
|
@Override //RpcInvocationHandler
|
|
|
public ConnectionId getConnectionId() {
|
|
|
- return RPC.getConnectionIdForProxy(currentProxy);
|
|
|
+ return RPC.getConnectionIdForProxy(currentProxy.proxy);
|
|
|
}
|
|
|
|
|
|
}
|