|
@@ -107,9 +107,23 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
private URI resURI;
|
|
|
private boolean isEnabled;
|
|
|
|
|
|
- private TimelineJerseyRetryFilter retryFilter;
|
|
|
+ @Private
|
|
|
+ @VisibleForTesting
|
|
|
+ TimelineClientConnectionRetry connectionRetry;
|
|
|
+
|
|
|
+ // Abstract class for an operation that should be retried by timeline client
|
|
|
+ private static abstract class TimelineClientRetryOp {
|
|
|
+ // The operation that should be retried
|
|
|
+ public abstract Object run() throws IOException;
|
|
|
+ // The method to indicate if we should retry given the incoming exception
|
|
|
+ public abstract boolean shouldRetryOn(Exception e);
|
|
|
+ }
|
|
|
|
|
|
- static class TimelineJerseyRetryFilter extends ClientFilter {
|
|
|
+ // Class to handle retry
|
|
|
+ // Outside this class, only visible to tests
|
|
|
+ @Private
|
|
|
+ @VisibleForTesting
|
|
|
+ static class TimelineClientConnectionRetry {
|
|
|
// maxRetries < 0 means keep trying
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
@@ -119,14 +133,14 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
@VisibleForTesting
|
|
|
public long retryInterval;
|
|
|
|
|
|
- // Indicates if retries happened last time
|
|
|
+ // Indicates if retries happened last time. Only tests should read it.
|
|
|
+ // In unit tests, retryOn() calls should _not_ be concurrent.
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
public boolean retried = false;
|
|
|
|
|
|
// Constructor with default retry settings
|
|
|
- public TimelineJerseyRetryFilter(Configuration conf) {
|
|
|
- super();
|
|
|
+ public TimelineClientConnectionRetry(Configuration conf) {
|
|
|
maxRetries = conf.getInt(
|
|
|
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
|
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
|
@@ -135,32 +149,36 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public ClientResponse handle(ClientRequest cr)
|
|
|
- throws ClientHandlerException {
|
|
|
+ public Object retryOn(TimelineClientRetryOp op)
|
|
|
+ throws RuntimeException, IOException {
|
|
|
int leftRetries = maxRetries;
|
|
|
retried = false;
|
|
|
+
|
|
|
// keep trying
|
|
|
while (true) {
|
|
|
try {
|
|
|
- // try pass the request on, if fail, keep retrying
|
|
|
- return getNext().handle(cr);
|
|
|
- } catch (ClientHandlerException e) {
|
|
|
+ // try perform the op, if fail, keep retrying
|
|
|
+ return op.run();
|
|
|
+ } catch (IOException e) {
|
|
|
+ // We may only throw runtime and IO exceptions. After switching to
|
|
|
+ // Java 1.7, we can merge these two catch blocks into one.
|
|
|
+
|
|
|
// break if there's no retries left
|
|
|
if (leftRetries == 0) {
|
|
|
break;
|
|
|
}
|
|
|
- if(e.getCause() instanceof ConnectException) {
|
|
|
- if (leftRetries > 0) {
|
|
|
- LOG.info("Connection Timeout (" + cr.getURI() + "), will try "
|
|
|
- + leftRetries + " more time(s).");
|
|
|
- } else {
|
|
|
- // note that maxRetries may be -1 at the very beginning
|
|
|
- // maxRetries = -1 means keep trying
|
|
|
- LOG.info("Connection Timeout (" + cr.getURI()
|
|
|
- + "), will keep retrying.");
|
|
|
- }
|
|
|
- retried = true;
|
|
|
+ if (op.shouldRetryOn(e)) {
|
|
|
+ logException(e, leftRetries);
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ } catch (RuntimeException e) {
|
|
|
+ // break if there's no retries left
|
|
|
+ if (leftRetries == 0) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (op.shouldRetryOn(e)) {
|
|
|
+ logException(e, leftRetries);
|
|
|
} else {
|
|
|
throw e;
|
|
|
}
|
|
@@ -168,6 +186,7 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
if (leftRetries > 0) {
|
|
|
leftRetries--;
|
|
|
}
|
|
|
+ retried = true;
|
|
|
try {
|
|
|
// sleep for the given time interval
|
|
|
Thread.sleep(retryInterval);
|
|
@@ -175,10 +194,51 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
LOG.warn("Client retry sleep interrupted! ");
|
|
|
}
|
|
|
}
|
|
|
- throw new ClientHandlerException("Failed to connect to timeline server. "
|
|
|
+ throw new RuntimeException("Failed to connect to timeline server. "
|
|
|
+ "Connection retries limit exceeded. "
|
|
|
+ "The posted timeline event may be missing");
|
|
|
};
|
|
|
+
|
|
|
+ private void logException(Exception e, int leftRetries) {
|
|
|
+ if (leftRetries > 0) {
|
|
|
+ LOG.info("Exception caught by TimelineClientConnectionRetry,"
|
|
|
+ + " will try " + leftRetries + " more time(s).\nMessage: "
|
|
|
+ + e.getMessage());
|
|
|
+ } else {
|
|
|
+ // note that maxRetries may be -1 at the very beginning
|
|
|
+ LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
|
|
|
+ + " will keep retrying.\nMessage: "
|
|
|
+ + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class TimelineJerseyRetryFilter extends ClientFilter {
|
|
|
+ @Override
|
|
|
+ public ClientResponse handle(final ClientRequest cr)
|
|
|
+ throws ClientHandlerException {
|
|
|
+ // Set up the retry operation
|
|
|
+ TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
|
|
|
+ @Override
|
|
|
+ public Object run() {
|
|
|
+ // Try pass the request, if fail, keep retrying
|
|
|
+ return getNext().handle(cr);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean shouldRetryOn(Exception e) {
|
|
|
+ // Only retry on connection exceptions
|
|
|
+ return (e instanceof ClientHandlerException)
|
|
|
+ && (e.getCause() instanceof ConnectException);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ try {
|
|
|
+ return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ClientHandlerException("Jersey retry failed!\nMessage: "
|
|
|
+ + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public TimelineClientImpl() {
|
|
@@ -201,10 +261,12 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
authenticator = new PseudoDelegationTokenAuthenticator();
|
|
|
}
|
|
|
authenticator.setConnectionConfigurator(connConfigurator);
|
|
|
+ token = new DelegationTokenAuthenticatedURL.Token();
|
|
|
+
|
|
|
+ connectionRetry = new TimelineClientConnectionRetry(conf);
|
|
|
client = new Client(new URLConnectionClientHandler(
|
|
|
new TimelineURLConnectionFactory()), cc);
|
|
|
- token = new DelegationTokenAuthenticatedURL.Token();
|
|
|
- retryFilter = new TimelineJerseyRetryFilter(conf);
|
|
|
+ TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
|
|
|
client.addFilter(retryFilter);
|
|
|
|
|
|
if (YarnConfiguration.useHttps(conf)) {
|
|
@@ -282,36 +344,45 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
@Override
|
|
|
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
|
|
|
final String renewer) throws IOException, YarnException {
|
|
|
- boolean isProxyAccess =
|
|
|
- UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
|
|
- == UserGroupInformation.AuthenticationMethod.PROXY;
|
|
|
- UserGroupInformation callerUGI = isProxyAccess ?
|
|
|
- UserGroupInformation.getCurrentUser().getRealUser()
|
|
|
- : UserGroupInformation.getCurrentUser();
|
|
|
- final String doAsUser = isProxyAccess ?
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
|
|
- try {
|
|
|
- return callerUGI.doAs(
|
|
|
- new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
|
|
|
- @Override
|
|
|
- public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
|
|
|
- DelegationTokenAuthenticatedURL authUrl =
|
|
|
- new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
|
|
|
- return (Token) authUrl.getDelegationToken(
|
|
|
- resURI.toURL(), token, renewer, doAsUser);
|
|
|
+ // Set up the retry operation
|
|
|
+ TimelineClientRetryOp tokenRetryOp = new TimelineClientRetryOp() {
|
|
|
+ @Override
|
|
|
+ public Object run() throws IOException {
|
|
|
+ // Try pass the request, if fail, keep retrying
|
|
|
+ boolean isProxyAccess =
|
|
|
+ UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
|
|
+ == UserGroupInformation.AuthenticationMethod.PROXY;
|
|
|
+ UserGroupInformation callerUGI = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getRealUser()
|
|
|
+ : UserGroupInformation.getCurrentUser();
|
|
|
+ final String doAsUser = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
|
|
+ try {
|
|
|
+ return callerUGI.doAs(
|
|
|
+ new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
|
|
|
+ @Override
|
|
|
+ public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
|
|
|
+ DelegationTokenAuthenticatedURL authUrl =
|
|
|
+ new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
|
|
|
+ return (Token) authUrl.getDelegationToken(
|
|
|
+ resURI.toURL(), token, renewer, doAsUser);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (UndeclaredThrowableException e) {
|
|
|
+ throw new IOException(e.getCause());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new IOException(e);
|
|
|
}
|
|
|
- });
|
|
|
- } catch (UndeclaredThrowableException e) {
|
|
|
- throw new IOException(e.getCause());
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw new IOException(e);
|
|
|
- }
|
|
|
- }
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public boolean shouldRetryOn(Exception e) {
|
|
|
+ // Only retry on connection exceptions
|
|
|
+ return (e instanceof ConnectException);
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
- @Private
|
|
|
- @VisibleForTesting
|
|
|
- public TimelineJerseyRetryFilter getRetryFilter() {
|
|
|
- return retryFilter;
|
|
|
+ return (Token<TimelineDelegationTokenIdentifier>)
|
|
|
+ connectionRetry.retryOn(tokenRetryOp);
|
|
|
}
|
|
|
|
|
|
@Private
|