|
@@ -68,13 +68,13 @@ import org.codehaus.jackson.map.ObjectMapper;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Joiner;
|
|
|
import com.sun.jersey.api.client.Client;
|
|
|
-import com.sun.jersey.api.client.filter.ClientFilter;
|
|
|
-import com.sun.jersey.api.client.ClientResponse;
|
|
|
-import com.sun.jersey.api.client.ClientRequest;
|
|
|
import com.sun.jersey.api.client.ClientHandlerException;
|
|
|
+import com.sun.jersey.api.client.ClientRequest;
|
|
|
+import com.sun.jersey.api.client.ClientResponse;
|
|
|
import com.sun.jersey.api.client.WebResource;
|
|
|
import com.sun.jersey.api.client.config.ClientConfig;
|
|
|
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
|
|
+import com.sun.jersey.api.client.filter.ClientFilter;
|
|
|
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
|
|
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
|
|
|
|
@@ -124,6 +124,7 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
static class TimelineClientConnectionRetry {
|
|
|
+
|
|
|
// maxRetries < 0 means keep trying
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
@@ -344,8 +345,97 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
@Override
|
|
|
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
|
|
|
final String renewer) throws IOException, YarnException {
|
|
|
+ boolean isProxyAccess =
|
|
|
+ UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
|
|
+ == UserGroupInformation.AuthenticationMethod.PROXY;
|
|
|
+ final String doAsUser = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
|
|
+ PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction =
|
|
|
+ new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Token<TimelineDelegationTokenIdentifier> run()
|
|
|
+ throws Exception {
|
|
|
+ DelegationTokenAuthenticatedURL authUrl =
|
|
|
+ new DelegationTokenAuthenticatedURL(authenticator,
|
|
|
+ connConfigurator);
|
|
|
+ return (Token) authUrl.getDelegationToken(
|
|
|
+ resURI.toURL(), token, renewer, doAsUser);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ public long renewDelegationToken(
|
|
|
+ final Token<TimelineDelegationTokenIdentifier> timelineDT)
|
|
|
+ throws IOException, YarnException {
|
|
|
+ boolean isProxyAccess =
|
|
|
+ UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
|
|
+ == UserGroupInformation.AuthenticationMethod.PROXY;
|
|
|
+ final String doAsUser = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
|
|
+ PrivilegedExceptionAction<Long> renewDTAction =
|
|
|
+ new PrivilegedExceptionAction<Long>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Long run()
|
|
|
+ throws Exception {
|
|
|
+ // If the timeline DT to renew is different than cached, replace it.
|
|
|
+ // Token to set every time for retry, because when exception happens,
|
|
|
+ // DelegationTokenAuthenticatedURL will reset it to null;
|
|
|
+ if (!timelineDT.equals(token.getDelegationToken())) {
|
|
|
+ token.setDelegationToken((Token) timelineDT);
|
|
|
+ }
|
|
|
+ DelegationTokenAuthenticatedURL authUrl =
|
|
|
+ new DelegationTokenAuthenticatedURL(authenticator,
|
|
|
+ connConfigurator);
|
|
|
+ return authUrl
|
|
|
+ .renewDelegationToken(resURI.toURL(), token, doAsUser);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ return (Long) operateDelegationToken(renewDTAction);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ public void cancelDelegationToken(
|
|
|
+ final Token<TimelineDelegationTokenIdentifier> timelineDT)
|
|
|
+ throws IOException, YarnException {
|
|
|
+ boolean isProxyAccess =
|
|
|
+ UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
|
|
+ == UserGroupInformation.AuthenticationMethod.PROXY;
|
|
|
+ final String doAsUser = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
|
|
+ PrivilegedExceptionAction<Void> cancelDTAction =
|
|
|
+ new PrivilegedExceptionAction<Void>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Void run()
|
|
|
+ throws Exception {
|
|
|
+ // If the timeline DT to cancel is different than cached, replace it.
|
|
|
+ // Token to set every time for retry, because when exception happens,
|
|
|
+ // DelegationTokenAuthenticatedURL will reset it to null;
|
|
|
+ if (!timelineDT.equals(token.getDelegationToken())) {
|
|
|
+ token.setDelegationToken((Token) timelineDT);
|
|
|
+ }
|
|
|
+ DelegationTokenAuthenticatedURL authUrl =
|
|
|
+ new DelegationTokenAuthenticatedURL(authenticator,
|
|
|
+ connConfigurator);
|
|
|
+ authUrl.cancelDelegationToken(resURI.toURL(), token, doAsUser);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ operateDelegationToken(cancelDTAction);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Object operateDelegationToken(
|
|
|
+ final PrivilegedExceptionAction<?> action)
|
|
|
+ throws IOException, YarnException {
|
|
|
// Set up the retry operation
|
|
|
TimelineClientRetryOp tokenRetryOp = new TimelineClientRetryOp() {
|
|
|
+
|
|
|
@Override
|
|
|
public Object run() throws IOException {
|
|
|
// Try pass the request, if fail, keep retrying
|
|
@@ -355,25 +445,15 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
UserGroupInformation callerUGI = isProxyAccess ?
|
|
|
UserGroupInformation.getCurrentUser().getRealUser()
|
|
|
: UserGroupInformation.getCurrentUser();
|
|
|
- final String doAsUser = isProxyAccess ?
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
|
|
try {
|
|
|
- return callerUGI.doAs(
|
|
|
- new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
|
|
|
- @Override
|
|
|
- public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
|
|
|
- DelegationTokenAuthenticatedURL authUrl =
|
|
|
- new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
|
|
|
- return (Token) authUrl.getDelegationToken(
|
|
|
- resURI.toURL(), token, renewer, doAsUser);
|
|
|
- }
|
|
|
- });
|
|
|
+ return callerUGI.doAs(action);
|
|
|
} catch (UndeclaredThrowableException e) {
|
|
|
throw new IOException(e.getCause());
|
|
|
} catch (InterruptedException e) {
|
|
|
throw new IOException(e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
@Override
|
|
|
public boolean shouldRetryOn(Exception e) {
|
|
|
// Only retry on connection exceptions
|
|
@@ -381,8 +461,7 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- return (Token<TimelineDelegationTokenIdentifier>)
|
|
|
- connectionRetry.retryOn(tokenRetryOp);
|
|
|
+ return connectionRetry.retryOn(tokenRetryOp);
|
|
|
}
|
|
|
|
|
|
@Private
|