|
@@ -33,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.StandbyException;
|
|
import org.apache.hadoop.ipc.StandbyException;
|
|
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* <p>
|
|
* <p>
|
|
* A collection of useful implementations of {@link RetryPolicy}.
|
|
* A collection of useful implementations of {@link RetryPolicy}.
|
|
@@ -42,6 +44,8 @@ public class RetryPolicies {
|
|
|
|
|
|
public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
|
|
public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
|
|
|
|
|
|
|
|
+ private static final Random RAND = new Random();
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* <p>
|
|
* <p>
|
|
* Try once, and fail by re-throwing the exception.
|
|
* Try once, and fail by re-throwing the exception.
|
|
@@ -137,7 +141,14 @@ public class RetryPolicies {
|
|
|
|
|
|
public static final RetryPolicy failoverOnNetworkException(
|
|
public static final RetryPolicy failoverOnNetworkException(
|
|
RetryPolicy fallbackPolicy, int maxFailovers) {
|
|
RetryPolicy fallbackPolicy, int maxFailovers) {
|
|
- return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers);
|
|
|
|
|
|
+ return failoverOnNetworkException(fallbackPolicy, maxFailovers, 0, 0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static final RetryPolicy failoverOnNetworkException(
|
|
|
|
+ RetryPolicy fallbackPolicy, int maxFailovers, long delayMillis,
|
|
|
|
+ long maxDelayBase) {
|
|
|
|
+ return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers,
|
|
|
|
+ delayMillis, maxDelayBase);
|
|
}
|
|
}
|
|
|
|
|
|
static class TryOnceThenFail implements RetryPolicy {
|
|
static class TryOnceThenFail implements RetryPolicy {
|
|
@@ -176,12 +187,8 @@ public class RetryPolicies {
|
|
if (retries >= maxRetries) {
|
|
if (retries >= maxRetries) {
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
- try {
|
|
|
|
- timeUnit.sleep(calculateSleepTime(retries));
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- // retry
|
|
|
|
- }
|
|
|
|
- return RetryAction.RETRY;
|
|
|
|
|
|
+ return new RetryAction(RetryAction.RetryDecision.RETRY,
|
|
|
|
+ timeUnit.toMillis(calculateSleepTime(retries)));
|
|
}
|
|
}
|
|
|
|
|
|
protected abstract long calculateSleepTime(int retries);
|
|
protected abstract long calculateSleepTime(int retries);
|
|
@@ -268,7 +275,7 @@ public class RetryPolicies {
|
|
}
|
|
}
|
|
|
|
|
|
static class ExponentialBackoffRetry extends RetryLimited {
|
|
static class ExponentialBackoffRetry extends RetryLimited {
|
|
- private Random r = new Random();
|
|
|
|
|
|
+
|
|
public ExponentialBackoffRetry(
|
|
public ExponentialBackoffRetry(
|
|
int maxRetries, long sleepTime, TimeUnit timeUnit) {
|
|
int maxRetries, long sleepTime, TimeUnit timeUnit) {
|
|
super(maxRetries, sleepTime, timeUnit);
|
|
super(maxRetries, sleepTime, timeUnit);
|
|
@@ -276,16 +283,19 @@ public class RetryPolicies {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected long calculateSleepTime(int retries) {
|
|
protected long calculateSleepTime(int retries) {
|
|
- return sleepTime*r.nextInt(1<<(retries+1));
|
|
|
|
|
|
+ return calculateExponentialTime(sleepTime, retries + 1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /*
|
|
|
|
|
|
+ /**
|
|
* Fail over and retry in the case of:
|
|
* Fail over and retry in the case of:
|
|
* Remote StandbyException (server is up, but is not the active server)
|
|
* Remote StandbyException (server is up, but is not the active server)
|
|
* Immediate socket exceptions (e.g. no route to host, econnrefused)
|
|
* Immediate socket exceptions (e.g. no route to host, econnrefused)
|
|
* Socket exceptions after initial connection when operation is idempotent
|
|
* Socket exceptions after initial connection when operation is idempotent
|
|
*
|
|
*
|
|
|
|
+ * The first failover is immediate, while all subsequent failovers wait an
|
|
|
|
+ * exponentially-increasing random amount of time.
|
|
|
|
+ *
|
|
* Fail immediately in the case of:
|
|
* Fail immediately in the case of:
|
|
* Socket exceptions after initial connection when operation is not idempotent
|
|
* Socket exceptions after initial connection when operation is not idempotent
|
|
*
|
|
*
|
|
@@ -295,11 +305,20 @@ public class RetryPolicies {
|
|
|
|
|
|
private RetryPolicy fallbackPolicy;
|
|
private RetryPolicy fallbackPolicy;
|
|
private int maxFailovers;
|
|
private int maxFailovers;
|
|
|
|
+ private long delayMillis;
|
|
|
|
+ private long maxDelayBase;
|
|
|
|
|
|
public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
|
|
public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
|
|
int maxFailovers) {
|
|
int maxFailovers) {
|
|
|
|
+ this(fallbackPolicy, maxFailovers, 0, 0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
|
|
|
|
+ int maxFailovers, long delayMillis, long maxDelayBase) {
|
|
this.fallbackPolicy = fallbackPolicy;
|
|
this.fallbackPolicy = fallbackPolicy;
|
|
this.maxFailovers = maxFailovers;
|
|
this.maxFailovers = maxFailovers;
|
|
|
|
+ this.delayMillis = delayMillis;
|
|
|
|
+ this.maxDelayBase = maxDelayBase;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -314,8 +333,13 @@ public class RetryPolicies {
|
|
if (e instanceof ConnectException ||
|
|
if (e instanceof ConnectException ||
|
|
e instanceof NoRouteToHostException ||
|
|
e instanceof NoRouteToHostException ||
|
|
e instanceof UnknownHostException ||
|
|
e instanceof UnknownHostException ||
|
|
- e instanceof StandbyException) {
|
|
|
|
- return RetryAction.FAILOVER_AND_RETRY;
|
|
|
|
|
|
+ e instanceof StandbyException ||
|
|
|
|
+ isWrappedStandbyException(e)) {
|
|
|
|
+ return new RetryAction(
|
|
|
|
+ RetryAction.RetryDecision.FAILOVER_AND_RETRY,
|
|
|
|
+ // retry immediately if this is our first failover, sleep otherwise
|
|
|
|
+ failovers == 0 ? 0 :
|
|
|
|
+ calculateExponentialTime(delayMillis, failovers, maxDelayBase));
|
|
} else if (e instanceof SocketException ||
|
|
} else if (e instanceof SocketException ||
|
|
e instanceof IOException) {
|
|
e instanceof IOException) {
|
|
if (isMethodIdempotent) {
|
|
if (isMethodIdempotent) {
|
|
@@ -330,4 +354,34 @@ public class RetryPolicies {
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Return a value which is <code>time</code> increasing exponentially as a
|
|
|
|
+ * function of <code>retries</code>, +/- 0%-50% of that value, chosen
|
|
|
|
+ * randomly.
|
|
|
|
+ *
|
|
|
|
+ * @param time the base amount of time to work with
|
|
|
|
+ * @param retries the number of retries that have so occurred so far
|
|
|
|
+ * @param cap value at which to cap the base sleep time
|
|
|
|
+ * @return an amount of time to sleep
|
|
|
|
+ */
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public static long calculateExponentialTime(long time, int retries,
|
|
|
|
+ long cap) {
|
|
|
|
+ long baseTime = Math.min(time * ((long)1 << retries), cap);
|
|
|
|
+ return (long) (baseTime * (RAND.nextFloat() + 0.5));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static long calculateExponentialTime(long time, int retries) {
|
|
|
|
+ return calculateExponentialTime(time, retries, Long.MAX_VALUE);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static boolean isWrappedStandbyException(Exception e) {
|
|
|
|
+ if (!(e instanceof RemoteException)) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
|
|
|
|
+ StandbyException.class);
|
|
|
|
+ return unwrapped instanceof StandbyException;
|
|
|
|
+ }
|
|
}
|
|
}
|