|
@@ -20,14 +20,14 @@ package org.apache.hadoop.yarn.client.api.impl;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.UndeclaredThrowableException;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.URI;
|
|
|
import java.net.URL;
|
|
|
import java.net.URLConnection;
|
|
|
import java.security.GeneralSecurityException;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
|
|
|
import javax.net.ssl.HostnameVerifier;
|
|
|
import javax.net.ssl.HttpsURLConnection;
|
|
@@ -43,13 +43,14 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
|
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
|
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
|
|
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
|
|
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
|
|
+import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
@@ -60,8 +61,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
|
|
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenSelector;
|
|
|
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
|
|
@@ -81,7 +80,6 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
|
|
|
private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
|
|
|
- private static final String URL_PARAM_USER_NAME = "user.name";
|
|
|
private static final Joiner JOINER = Joiner.on("");
|
|
|
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
|
|
|
|
|
@@ -99,9 +97,11 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
}
|
|
|
|
|
|
private Client client;
|
|
|
+ private ConnectionConfigurator connConfigurator;
|
|
|
+ private DelegationTokenAuthenticator authenticator;
|
|
|
+ private DelegationTokenAuthenticatedURL.Token token;
|
|
|
private URI resURI;
|
|
|
private boolean isEnabled;
|
|
|
- private KerberosAuthenticatedURLConnectionFactory urlFactory;
|
|
|
|
|
|
public TimelineClientImpl() {
|
|
|
super(TimelineClientImpl.class.getName());
|
|
@@ -116,15 +116,17 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
} else {
|
|
|
ClientConfig cc = new DefaultClientConfig();
|
|
|
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
|
|
- ConnectionConfigurator connConfigurator = newConnConfigurator(conf);
|
|
|
+ connConfigurator = newConnConfigurator(conf);
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- TimelineAuthenticator.setStaticConnectionConfigurator(connConfigurator);
|
|
|
- urlFactory = new KerberosAuthenticatedURLConnectionFactory(connConfigurator);
|
|
|
- client = new Client(new URLConnectionClientHandler(urlFactory), cc);
|
|
|
+ authenticator = new KerberosDelegationTokenAuthenticator();
|
|
|
} else {
|
|
|
- client = new Client(new URLConnectionClientHandler(
|
|
|
- new PseudoAuthenticatedURLConnectionFactory(connConfigurator)), cc);
|
|
|
+ authenticator = new PseudoDelegationTokenAuthenticator();
|
|
|
}
|
|
|
+ authenticator.setConnectionConfigurator(connConfigurator);
|
|
|
+ client = new Client(new URLConnectionClientHandler(
|
|
|
+ new TimelineURLConnectionFactory()), cc);
|
|
|
+ token = new DelegationTokenAuthenticatedURL.Token();
|
|
|
+
|
|
|
if (YarnConfiguration.useHttps(conf)) {
|
|
|
resURI = URI
|
|
|
.create(JOINER.join("https://", conf.get(
|
|
@@ -137,9 +139,6 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
|
|
|
RESOURCE_URI_STR));
|
|
|
}
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- urlFactory.setService(TimelineUtils.buildTimelineTokenService(conf));
|
|
|
- }
|
|
|
LOG.info("Timeline service address: " + resURI);
|
|
|
}
|
|
|
super.serviceInit(conf);
|
|
@@ -199,11 +198,34 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
return resp;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
|
|
|
- String renewer) throws IOException, YarnException {
|
|
|
- return TimelineAuthenticator.getDelegationToken(resURI.toURL(),
|
|
|
- urlFactory.token, renewer);
|
|
|
+ final String renewer) throws IOException, YarnException {
|
|
|
+ boolean isProxyAccess =
|
|
|
+ UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
|
|
+ == UserGroupInformation.AuthenticationMethod.PROXY;
|
|
|
+ UserGroupInformation callerUGI = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getRealUser()
|
|
|
+ : UserGroupInformation.getCurrentUser();
|
|
|
+ final String doAsUser = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
|
|
+ try {
|
|
|
+ return callerUGI.doAs(
|
|
|
+ new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
|
|
|
+ @Override
|
|
|
+ public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
|
|
|
+ DelegationTokenAuthenticatedURL authUrl =
|
|
|
+ new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
|
|
|
+ return (Token) authUrl.getDelegationToken(
|
|
|
+ resURI.toURL(), token, renewer, doAsUser);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (UndeclaredThrowableException e) {
|
|
|
+ throw new IOException(e.getCause());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Private
|
|
@@ -223,91 +245,35 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static class PseudoAuthenticatedURLConnectionFactory
|
|
|
- implements HttpURLConnectionFactory {
|
|
|
-
|
|
|
- private ConnectionConfigurator connConfigurator;
|
|
|
-
|
|
|
- public PseudoAuthenticatedURLConnectionFactory(
|
|
|
- ConnectionConfigurator connConfigurator) {
|
|
|
- this.connConfigurator = connConfigurator;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
|
|
|
- Map<String, String> params = new HashMap<String, String>();
|
|
|
- params.put(URL_PARAM_USER_NAME,
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
- url = TimelineAuthenticator.appendParams(url, params);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("URL with delegation token: " + url);
|
|
|
- }
|
|
|
- return connConfigurator.configure((HttpURLConnection) url.openConnection());
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- private static class KerberosAuthenticatedURLConnectionFactory
|
|
|
+ private class TimelineURLConnectionFactory
|
|
|
implements HttpURLConnectionFactory {
|
|
|
|
|
|
- private AuthenticatedURL.Token token;
|
|
|
- private TimelineAuthenticator authenticator;
|
|
|
- private Token<TimelineDelegationTokenIdentifier> dToken;
|
|
|
- private Text service;
|
|
|
- private ConnectionConfigurator connConfigurator;
|
|
|
-
|
|
|
- public KerberosAuthenticatedURLConnectionFactory(
|
|
|
- ConnectionConfigurator connConfigurator) {
|
|
|
- token = new AuthenticatedURL.Token();
|
|
|
- authenticator = new TimelineAuthenticator();
|
|
|
- this.connConfigurator = connConfigurator;
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
- public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
|
|
|
+ public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
|
|
|
+ boolean isProxyAccess =
|
|
|
+ UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
|
|
+ == UserGroupInformation.AuthenticationMethod.PROXY;
|
|
|
+ UserGroupInformation callerUGI = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getRealUser()
|
|
|
+ : UserGroupInformation.getCurrentUser();
|
|
|
+ final String doAsUser = isProxyAccess ?
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
|
|
try {
|
|
|
- if (dToken == null) {
|
|
|
- //TODO: need to take care of the renew case
|
|
|
- dToken = selectToken();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Timeline delegation token: " + dToken.toString());
|
|
|
+ return callerUGI.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
|
|
|
+ @Override
|
|
|
+ public HttpURLConnection run() throws Exception {
|
|
|
+ return new DelegationTokenAuthenticatedURL(
|
|
|
+ authenticator, connConfigurator).openConnection(url, token,
|
|
|
+ doAsUser);
|
|
|
}
|
|
|
- }
|
|
|
- if (dToken != null) {
|
|
|
- Map<String, String> params = new HashMap<String, String>();
|
|
|
- TimelineAuthenticator.injectDelegationToken(params, dToken);
|
|
|
- url = TimelineAuthenticator.appendParams(url, params);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("URL with delegation token: " + url);
|
|
|
- }
|
|
|
- }
|
|
|
- return new AuthenticatedURL(
|
|
|
- authenticator, connConfigurator).openConnection(url, token);
|
|
|
- } catch (AuthenticationException e) {
|
|
|
- LOG.error("Authentication failed when openning connection [" + url
|
|
|
- + "] with token [" + token + "].", e);
|
|
|
+ });
|
|
|
+ } catch (UndeclaredThrowableException e) {
|
|
|
+ throw new IOException(e.getCause());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
throw new IOException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Token<TimelineDelegationTokenIdentifier> selectToken() {
|
|
|
- UserGroupInformation ugi;
|
|
|
- try {
|
|
|
- ugi = UserGroupInformation.getCurrentUser();
|
|
|
- } catch (IOException e) {
|
|
|
- String msg = "Error when getting the current user";
|
|
|
- LOG.error(msg, e);
|
|
|
- throw new YarnRuntimeException(msg, e);
|
|
|
- }
|
|
|
- TimelineDelegationTokenSelector tokenSelector =
|
|
|
- new TimelineDelegationTokenSelector();
|
|
|
- return tokenSelector.selectToken(
|
|
|
- service, ugi.getCredentials().getAllTokens());
|
|
|
- }
|
|
|
-
|
|
|
- public void setService(Text service) {
|
|
|
- this.service = service;
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
|
|
|
private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
|