|
@@ -23,10 +23,15 @@ import java.io.IOException;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
|
|
+import java.net.URLConnection;
|
|
|
|
+import java.security.GeneralSecurityException;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
+import javax.net.ssl.HostnameVerifier;
|
|
|
|
+import javax.net.ssl.HttpsURLConnection;
|
|
|
|
+import javax.net.ssl.SSLSocketFactory;
|
|
import javax.ws.rs.core.MediaType;
|
|
import javax.ws.rs.core.MediaType;
|
|
|
|
|
|
import org.apache.commons.cli.CommandLine;
|
|
import org.apache.commons.cli.CommandLine;
|
|
@@ -42,6 +47,8 @@ import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
|
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
|
|
|
+import org.apache.hadoop.security.ssl.SSLFactory;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
@@ -74,7 +81,10 @@ public class TimelineClientImpl extends TimelineClient {
|
|
private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
|
|
private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
|
|
private static final String URL_PARAM_USER_NAME = "user.name";
|
|
private static final String URL_PARAM_USER_NAME = "user.name";
|
|
private static final Joiner JOINER = Joiner.on("");
|
|
private static final Joiner JOINER = Joiner.on("");
|
|
|
|
+ public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
|
|
|
|
+
|
|
private static Options opts;
|
|
private static Options opts;
|
|
|
|
+
|
|
static {
|
|
static {
|
|
opts = new Options();
|
|
opts = new Options();
|
|
opts.addOption("put", true, "Put the TimelineEntities in a JSON file");
|
|
opts.addOption("put", true, "Put the TimelineEntities in a JSON file");
|
|
@@ -89,15 +99,6 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
|
|
|
public TimelineClientImpl() {
|
|
public TimelineClientImpl() {
|
|
super(TimelineClientImpl.class.getName());
|
|
super(TimelineClientImpl.class.getName());
|
|
- ClientConfig cc = new DefaultClientConfig();
|
|
|
|
- cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
|
- urlFactory = new KerberosAuthenticatedURLConnectionFactory();
|
|
|
|
- client = new Client(new URLConnectionClientHandler(urlFactory), cc);
|
|
|
|
- } else {
|
|
|
|
- client = new Client(new URLConnectionClientHandler(
|
|
|
|
- new PseudoAuthenticatedURLConnectionFactory()), cc);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
@@ -107,6 +108,17 @@ public class TimelineClientImpl extends TimelineClient {
|
|
if (!isEnabled) {
|
|
if (!isEnabled) {
|
|
LOG.info("Timeline service is not enabled");
|
|
LOG.info("Timeline service is not enabled");
|
|
} else {
|
|
} else {
|
|
|
|
+ ClientConfig cc = new DefaultClientConfig();
|
|
|
|
+ cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
|
|
|
+ ConnectionConfigurator connConfigurator = newConnConfigurator(conf);
|
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
|
+ TimelineAuthenticator.setStaticConnectionConfigurator(connConfigurator);
|
|
|
|
+ urlFactory = new KerberosAuthenticatedURLConnectionFactory(connConfigurator);
|
|
|
|
+ client = new Client(new URLConnectionClientHandler(urlFactory), cc);
|
|
|
|
+ } else {
|
|
|
|
+ client = new Client(new URLConnectionClientHandler(
|
|
|
|
+ new PseudoAuthenticatedURLConnectionFactory(connConfigurator)), cc);
|
|
|
|
+ }
|
|
if (YarnConfiguration.useHttps(conf)) {
|
|
if (YarnConfiguration.useHttps(conf)) {
|
|
resURI = URI
|
|
resURI = URI
|
|
.create(JOINER.join("https://", conf.get(
|
|
.create(JOINER.join("https://", conf.get(
|
|
@@ -182,6 +194,13 @@ public class TimelineClientImpl extends TimelineClient {
|
|
private static class PseudoAuthenticatedURLConnectionFactory
|
|
private static class PseudoAuthenticatedURLConnectionFactory
|
|
implements HttpURLConnectionFactory {
|
|
implements HttpURLConnectionFactory {
|
|
|
|
|
|
|
|
+ private ConnectionConfigurator connConfigurator;
|
|
|
|
+
|
|
|
|
+ public PseudoAuthenticatedURLConnectionFactory(
|
|
|
|
+ ConnectionConfigurator connConfigurator) {
|
|
|
|
+ this.connConfigurator = connConfigurator;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
|
|
public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
|
|
Map<String, String> params = new HashMap<String, String>();
|
|
Map<String, String> params = new HashMap<String, String>();
|
|
@@ -191,7 +210,7 @@ public class TimelineClientImpl extends TimelineClient {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("URL with delegation token: " + url);
|
|
LOG.debug("URL with delegation token: " + url);
|
|
}
|
|
}
|
|
- return (HttpURLConnection) url.openConnection();
|
|
|
|
|
|
+ return connConfigurator.configure((HttpURLConnection) url.openConnection());
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -202,10 +221,13 @@ public class TimelineClientImpl extends TimelineClient {
|
|
private TimelineAuthenticator authenticator;
|
|
private TimelineAuthenticator authenticator;
|
|
private Token<TimelineDelegationTokenIdentifier> dToken;
|
|
private Token<TimelineDelegationTokenIdentifier> dToken;
|
|
private Text service;
|
|
private Text service;
|
|
|
|
+ private ConnectionConfigurator connConfigurator;
|
|
|
|
|
|
- public KerberosAuthenticatedURLConnectionFactory() {
|
|
|
|
|
|
+ public KerberosAuthenticatedURLConnectionFactory(
|
|
|
|
+ ConnectionConfigurator connConfigurator) {
|
|
token = new AuthenticatedURL.Token();
|
|
token = new AuthenticatedURL.Token();
|
|
authenticator = new TimelineAuthenticator();
|
|
authenticator = new TimelineAuthenticator();
|
|
|
|
+ this.connConfigurator = connConfigurator;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -226,7 +248,8 @@ public class TimelineClientImpl extends TimelineClient {
|
|
LOG.debug("URL with delegation token: " + url);
|
|
LOG.debug("URL with delegation token: " + url);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return new AuthenticatedURL(authenticator).openConnection(url, token);
|
|
|
|
|
|
+ return new AuthenticatedURL(
|
|
|
|
+ authenticator, connConfigurator).openConnection(url, token);
|
|
} catch (AuthenticationException e) {
|
|
} catch (AuthenticationException e) {
|
|
LOG.error("Authentication failed when openning connection [" + url
|
|
LOG.error("Authentication failed when openning connection [" + url
|
|
+ "] with token [" + token + "].", e);
|
|
+ "] with token [" + token + "].", e);
|
|
@@ -255,6 +278,57 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
|
|
|
|
+ try {
|
|
|
|
+ return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.debug("Cannot load customized ssl related configuration. " +
|
|
|
|
+ "Fallback to system-generic settings.", e);
|
|
|
|
+ return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
|
|
|
|
+ new ConnectionConfigurator() {
|
|
|
|
+ @Override
|
|
|
|
+ public HttpURLConnection configure(HttpURLConnection conn)
|
|
|
|
+ throws IOException {
|
|
|
|
+ setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
|
|
|
|
+ return conn;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
|
|
|
|
+ Configuration conf) throws IOException, GeneralSecurityException {
|
|
|
|
+ final SSLFactory factory;
|
|
|
|
+ final SSLSocketFactory sf;
|
|
|
|
+ final HostnameVerifier hv;
|
|
|
|
+
|
|
|
|
+ factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
|
|
|
+ factory.init();
|
|
|
|
+ sf = factory.createSSLSocketFactory();
|
|
|
|
+ hv = factory.getHostnameVerifier();
|
|
|
|
+
|
|
|
|
+ return new ConnectionConfigurator() {
|
|
|
|
+ @Override
|
|
|
|
+ public HttpURLConnection configure(HttpURLConnection conn)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (conn instanceof HttpsURLConnection) {
|
|
|
|
+ HttpsURLConnection c = (HttpsURLConnection) conn;
|
|
|
|
+ c.setSSLSocketFactory(sf);
|
|
|
|
+ c.setHostnameVerifier(hv);
|
|
|
|
+ }
|
|
|
|
+ setTimeouts(conn, timeout);
|
|
|
|
+ return conn;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static void setTimeouts(URLConnection connection, int socketTimeout) {
|
|
|
|
+ connection.setConnectTimeout(socketTimeout);
|
|
|
|
+ connection.setReadTimeout(socketTimeout);
|
|
|
|
+ }
|
|
|
|
+
|
|
public static void main(String[] argv) throws Exception {
|
|
public static void main(String[] argv) throws Exception {
|
|
CommandLine cliParser = new GnuParser().parse(opts, argv);
|
|
CommandLine cliParser = new GnuParser().parse(opts, argv);
|
|
if (cliParser.hasOption("put")) {
|
|
if (cliParser.hasOption("put")) {
|