|
@@ -18,13 +18,14 @@
|
|
|
package org.apache.hadoop.io.retry;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.InterruptedIOException;
|
|
@@ -41,33 +42,51 @@ import java.util.Map;
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
- public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
|
|
+ public static final Logger LOG = LoggerFactory.getLogger(
|
|
|
+ RetryInvocationHandler.class);
|
|
|
|
|
|
static class Call {
|
|
|
private final Method method;
|
|
|
private final Object[] args;
|
|
|
private final boolean isRpc;
|
|
|
private final int callId;
|
|
|
- final Counters counters;
|
|
|
+ private final Counters counters = new Counters();
|
|
|
|
|
|
private final RetryPolicy retryPolicy;
|
|
|
private final RetryInvocationHandler<?> retryInvocationHandler;
|
|
|
|
|
|
+ private RetryInfo retryInfo;
|
|
|
+
|
|
|
Call(Method method, Object[] args, boolean isRpc, int callId,
|
|
|
- Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
|
|
|
+ RetryInvocationHandler<?> retryInvocationHandler) {
|
|
|
this.method = method;
|
|
|
this.args = args;
|
|
|
this.isRpc = isRpc;
|
|
|
this.callId = callId;
|
|
|
- this.counters = counters;
|
|
|
|
|
|
this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
|
|
|
this.retryInvocationHandler = retryInvocationHandler;
|
|
|
}
|
|
|
|
|
|
+ int getCallId() {
|
|
|
+ return callId;
|
|
|
+ }
|
|
|
+
|
|
|
+ Counters getCounters() {
|
|
|
+ return counters;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized Long getWaitTime(final long now) {
|
|
|
+ return retryInfo == null? null: retryInfo.retryTime - now;
|
|
|
+ }
|
|
|
+
|
|
|
/** Invoke the call once without retrying. */
|
|
|
synchronized CallReturn invokeOnce() {
|
|
|
try {
|
|
|
+ if (retryInfo != null) {
|
|
|
+ return processWaitTimeAndRetryInfo();
|
|
|
+ }
|
|
|
+
|
|
|
// The number of times this invocation handler has ever been failed over
|
|
|
// before this method invocation attempt. Used to prevent concurrent
|
|
|
// failed method invocations from triggering multiple failover attempts.
|
|
@@ -76,28 +95,70 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
return invoke();
|
|
|
} catch (Exception e) {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(this, e);
|
|
|
+ LOG.trace(toString(), e);
|
|
|
}
|
|
|
if (Thread.currentThread().isInterrupted()) {
|
|
|
// If interrupted, do not retry.
|
|
|
throw e;
|
|
|
}
|
|
|
- retryInvocationHandler.handleException(
|
|
|
- method, retryPolicy, failoverCount, counters, e);
|
|
|
- return CallReturn.RETRY;
|
|
|
+
|
|
|
+ retryInfo = retryInvocationHandler.handleException(
|
|
|
+ method, callId, retryPolicy, counters, failoverCount, e);
|
|
|
+ return processWaitTimeAndRetryInfo();
|
|
|
}
|
|
|
} catch(Throwable t) {
|
|
|
return new CallReturn(t);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * It first processes the wait time, if there is any,
|
|
|
+ * and then invokes {@link #processRetryInfo()}.
|
|
|
+ *
|
|
|
+ * If the wait time is positive, it either sleeps for synchronous calls
|
|
|
+ * or immediately returns for asynchronous calls.
|
|
|
+ *
|
|
|
+ * @return {@link CallReturn#RETRY} if the retryInfo is processed;
|
|
|
+ * otherwise, return {@link CallReturn#WAIT_RETRY}.
|
|
|
+ */
|
|
|
+ CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException {
|
|
|
+ final Long waitTime = getWaitTime(Time.monotonicNow());
|
|
|
+ LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}",
|
|
|
+ callId, retryInfo, waitTime);
|
|
|
+ if (waitTime != null && waitTime > 0) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(retryInfo.delay);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ LOG.warn("Interrupted while waiting to retry", e);
|
|
|
+ InterruptedIOException intIOE = new InterruptedIOException(
|
|
|
+ "Retry interrupted");
|
|
|
+ intIOE.initCause(e);
|
|
|
+ throw intIOE;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ processRetryInfo();
|
|
|
+ return CallReturn.RETRY;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void processRetryInfo() {
|
|
|
+ counters.retries++;
|
|
|
+ if (retryInfo.isFailover()) {
|
|
|
+ retryInvocationHandler.proxyDescriptor.failover(
|
|
|
+ retryInfo.expectedFailoverCount, method, callId);
|
|
|
+ counters.failovers++;
|
|
|
+ }
|
|
|
+ retryInfo = null;
|
|
|
+ }
|
|
|
+
|
|
|
CallReturn invoke() throws Throwable {
|
|
|
return new CallReturn(invokeMethod());
|
|
|
}
|
|
|
|
|
|
Object invokeMethod() throws Throwable {
|
|
|
if (isRpc) {
|
|
|
- Client.setCallIdAndRetryCount(callId, counters.retries);
|
|
|
+ Client.setCallIdAndRetryCount(callId, counters.retries,
|
|
|
+ retryInvocationHandler.asyncCallHandler);
|
|
|
}
|
|
|
return retryInvocationHandler.invokeMethod(method, args);
|
|
|
}
|
|
@@ -146,15 +207,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
return failoverCount;
|
|
|
}
|
|
|
|
|
|
- synchronized void failover(long expectedFailoverCount, Method method) {
|
|
|
+ synchronized void failover(long expectedFailoverCount, Method method,
|
|
|
+ int callId) {
|
|
|
// Make sure that concurrent failed invocations only cause a single
|
|
|
// actual failover.
|
|
|
if (failoverCount == expectedFailoverCount) {
|
|
|
fpp.performFailover(proxyInfo.proxy);
|
|
|
failoverCount++;
|
|
|
} else {
|
|
|
- LOG.warn("A failover has occurred since the start of "
|
|
|
- + proxyInfo.getString(method.getName()));
|
|
|
+ LOG.warn("A failover has occurred since the start of call #" + callId
|
|
|
+ + " " + proxyInfo.getString(method.getName()));
|
|
|
}
|
|
|
proxyInfo = fpp.getProxy();
|
|
|
}
|
|
@@ -172,22 +234,33 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
}
|
|
|
|
|
|
private static class RetryInfo {
|
|
|
+ private final long retryTime;
|
|
|
private final long delay;
|
|
|
- private final RetryAction failover;
|
|
|
- private final RetryAction fail;
|
|
|
+ private final RetryAction action;
|
|
|
+ private final long expectedFailoverCount;
|
|
|
|
|
|
- RetryInfo(long delay, RetryAction failover, RetryAction fail) {
|
|
|
+ RetryInfo(long delay, RetryAction action, long expectedFailoverCount) {
|
|
|
this.delay = delay;
|
|
|
- this.failover = failover;
|
|
|
- this.fail = fail;
|
|
|
+ this.retryTime = Time.monotonicNow() + delay;
|
|
|
+ this.action = action;
|
|
|
+ this.expectedFailoverCount = expectedFailoverCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isFailover() {
|
|
|
+ return action != null
|
|
|
+ && action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isFail() {
|
|
|
+ return action != null
|
|
|
+ && action.action == RetryAction.RetryDecision.FAIL;
|
|
|
}
|
|
|
|
|
|
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
|
|
|
- Counters counters, boolean idempotentOrAtMostOnce) throws Exception {
|
|
|
+ Counters counters, boolean idempotentOrAtMostOnce,
|
|
|
+ long expectedFailoverCount) throws Exception {
|
|
|
+ RetryAction max = null;
|
|
|
long maxRetryDelay = 0;
|
|
|
- RetryAction failover = null;
|
|
|
- RetryAction retry = null;
|
|
|
- RetryAction fail = null;
|
|
|
|
|
|
final Iterable<Exception> exceptions = e instanceof MultiException ?
|
|
|
((MultiException) e).getExceptions().values()
|
|
@@ -195,23 +268,19 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
for (Exception exception : exceptions) {
|
|
|
final RetryAction a = policy.shouldRetry(exception,
|
|
|
counters.retries, counters.failovers, idempotentOrAtMostOnce);
|
|
|
- if (a.action == RetryAction.RetryDecision.FAIL) {
|
|
|
- fail = a;
|
|
|
- } else {
|
|
|
+ if (a.action != RetryAction.RetryDecision.FAIL) {
|
|
|
// must be a retry or failover
|
|
|
- if (a.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
|
|
|
- failover = a;
|
|
|
- } else {
|
|
|
- retry = a;
|
|
|
- }
|
|
|
if (a.delayMillis > maxRetryDelay) {
|
|
|
maxRetryDelay = a.delayMillis;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if (max == null || max.action.compareTo(a.action) < 0) {
|
|
|
+ max = a;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- return new RetryInfo(maxRetryDelay, failover,
|
|
|
- failover == null && retry == null? fail: null);
|
|
|
+ return new RetryInfo(maxRetryDelay, max, expectedFailoverCount);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -246,13 +315,12 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
return proxyDescriptor.getFailoverCount();
|
|
|
}
|
|
|
|
|
|
- private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
|
|
|
- Counters counters) {
|
|
|
+ private Call newCall(Method method, Object[] args, boolean isRpc,
|
|
|
+ int callId) {
|
|
|
if (Client.isAsynchronousMode()) {
|
|
|
- return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
|
|
|
- counters, this);
|
|
|
+ return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this);
|
|
|
} else {
|
|
|
- return new Call(method, args, isRpc, callId, counters, this);
|
|
|
+ return new Call(method, args, isRpc, callId, this);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -261,9 +329,8 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
throws Throwable {
|
|
|
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
|
|
|
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
|
|
|
- final Counters counters = new Counters();
|
|
|
|
|
|
- final Call call = newCall(method, args, isRpc, callId, counters);
|
|
|
+ final Call call = newCall(method, args, isRpc, callId);
|
|
|
while (true) {
|
|
|
final CallReturn c = call.invokeOnce();
|
|
|
final CallReturn.State state = c.getState();
|
|
@@ -275,45 +342,24 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void handleException(final Method method, final RetryPolicy policy,
|
|
|
- final long expectedFailoverCount, final Counters counters,
|
|
|
- final Exception ex) throws Exception {
|
|
|
- final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, ex, counters,
|
|
|
- proxyDescriptor.idempotentOrAtMostOnce(method));
|
|
|
- counters.retries++;
|
|
|
-
|
|
|
- if (retryInfo.fail != null) {
|
|
|
+ private RetryInfo handleException(final Method method, final int callId,
|
|
|
+ final RetryPolicy policy, final Counters counters,
|
|
|
+ final long expectFailoverCount, final Exception e) throws Exception {
|
|
|
+ final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e,
|
|
|
+ counters, proxyDescriptor.idempotentOrAtMostOnce(method),
|
|
|
+ expectFailoverCount);
|
|
|
+ if (retryInfo.isFail()) {
|
|
|
// fail.
|
|
|
- if (retryInfo.fail.reason != null) {
|
|
|
- LOG.warn("Exception while invoking "
|
|
|
+ if (retryInfo.action.reason != null) {
|
|
|
+ LOG.warn("Exception while invoking call #" + callId + " "
|
|
|
+ proxyDescriptor.getProxyInfo().getString(method.getName())
|
|
|
- + ". Not retrying because " + retryInfo.fail.reason, ex);
|
|
|
- }
|
|
|
- throw ex;
|
|
|
- }
|
|
|
-
|
|
|
- // retry
|
|
|
- final boolean isFailover = retryInfo.failover != null;
|
|
|
-
|
|
|
- log(method, isFailover, counters.failovers, retryInfo.delay, ex);
|
|
|
-
|
|
|
- if (retryInfo.delay > 0) {
|
|
|
- try {
|
|
|
- Thread.sleep(retryInfo.delay);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- LOG.warn("Interrupted while waiting to retry", e);
|
|
|
- InterruptedIOException intIOE = new InterruptedIOException(
|
|
|
- "Retry interrupted");
|
|
|
- intIOE.initCause(e);
|
|
|
- throw intIOE;
|
|
|
+ + ". Not retrying because " + retryInfo.action.reason, e);
|
|
|
}
|
|
|
+ throw e;
|
|
|
}
|
|
|
|
|
|
- if (isFailover) {
|
|
|
- proxyDescriptor.failover(expectedFailoverCount, method);
|
|
|
- counters.failovers++;
|
|
|
- }
|
|
|
+ log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
|
|
|
+ return retryInfo;
|
|
|
}
|
|
|
|
|
|
private void log(final Method method, final boolean isFailover,
|