|
@@ -18,24 +18,43 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.client.api.impl;
|
|
|
|
|
|
+import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.net.HttpURLConnection;
|
|
|
import java.net.URI;
|
|
|
+import java.net.URL;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
import javax.ws.rs.core.MediaType;
|
|
|
|
|
|
+import org.apache.commons.cli.CommandLine;
|
|
|
+import org.apache.commons.cli.GnuParser;
|
|
|
+import org.apache.commons.cli.HelpFormatter;
|
|
|
+import org.apache.commons.cli.Options;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
|
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
|
|
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenSelector;
|
|
|
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
|
|
+import org.codehaus.jackson.map.ObjectMapper;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Joiner;
|
|
@@ -44,6 +63,8 @@ import com.sun.jersey.api.client.ClientResponse;
|
|
|
import com.sun.jersey.api.client.WebResource;
|
|
|
import com.sun.jersey.api.client.config.ClientConfig;
|
|
|
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
|
|
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
|
|
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
|
|
|
|
|
@Private
|
|
|
@Unstable
|
|
@@ -52,16 +73,29 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
|
|
|
private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
|
|
|
private static final Joiner JOINER = Joiner.on("");
|
|
|
+ private static Options opts;
|
|
|
+ static {
|
|
|
+ opts = new Options();
|
|
|
+ opts.addOption("put", true, "Put the TimelineEntities in a JSON file");
|
|
|
+ opts.getOption("put").setArgName("Path to the JSON file");
|
|
|
+ opts.addOption("help", false, "Print usage");
|
|
|
+ }
|
|
|
|
|
|
private Client client;
|
|
|
private URI resURI;
|
|
|
private boolean isEnabled;
|
|
|
+ private TimelineAuthenticatedURLConnectionFactory urlFactory;
|
|
|
|
|
|
public TimelineClientImpl() {
|
|
|
super(TimelineClientImpl.class.getName());
|
|
|
ClientConfig cc = new DefaultClientConfig();
|
|
|
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
|
|
- client = Client.create(cc);
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ urlFactory = new TimelineAuthenticatedURLConnectionFactory();
|
|
|
+ client = new Client(new URLConnectionClientHandler(urlFactory), cc);
|
|
|
+ } else {
|
|
|
+ client = Client.create(cc);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
@@ -83,6 +117,9 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
|
|
|
RESOURCE_URI_STR));
|
|
|
}
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ urlFactory.setService(TimelineUtils.buildTimelineTokenService(conf));
|
|
|
+ }
|
|
|
LOG.info("Timeline service address: " + resURI);
|
|
|
}
|
|
|
super.serviceInit(conf);
|
|
@@ -124,6 +161,13 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
return resp.getEntity(TimelinePutResponse.class);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
|
|
|
+ String renewer) throws IOException, YarnException {
|
|
|
+ return TimelineAuthenticator.getDelegationToken(resURI.toURL(),
|
|
|
+ urlFactory.token, renewer);
|
|
|
+ }
|
|
|
+
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
public ClientResponse doPostingEntities(TimelineEntities entities) {
|
|
@@ -133,4 +177,138 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
.post(ClientResponse.class, entities);
|
|
|
}
|
|
|
|
|
|
+ private static class TimelineAuthenticatedURLConnectionFactory
|
|
|
+ implements HttpURLConnectionFactory {
|
|
|
+
|
|
|
+ private AuthenticatedURL.Token token;
|
|
|
+ private TimelineAuthenticator authenticator;
|
|
|
+ private Token<TimelineDelegationTokenIdentifier> dToken;
|
|
|
+ private Text service;
|
|
|
+
|
|
|
+ public TimelineAuthenticatedURLConnectionFactory() {
|
|
|
+ token = new AuthenticatedURL.Token();
|
|
|
+ authenticator = new TimelineAuthenticator();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
|
|
|
+ try {
|
|
|
+ if (dToken == null) {
|
|
|
+ //TODO: need to take care of the renew case
|
|
|
+ dToken = selectToken();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Timeline delegation token: " + dToken.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (dToken != null) {
|
|
|
+ Map<String, String> params = new HashMap<String, String>();
|
|
|
+ TimelineAuthenticator.injectDelegationToken(params, dToken);
|
|
|
+ url = TimelineAuthenticator.appendParams(url, params);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("URL with delegation token: " + url);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return new AuthenticatedURL(authenticator).openConnection(url, token);
|
|
|
+ } catch (AuthenticationException e) {
|
|
|
+ LOG.error("Authentication failed when openning connection [" + url
|
|
|
+ + "] with token [" + token + "].", e);
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Token<TimelineDelegationTokenIdentifier> selectToken() {
|
|
|
+ UserGroupInformation ugi;
|
|
|
+ try {
|
|
|
+ ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException e) {
|
|
|
+ String msg = "Error when getting the current user";
|
|
|
+ LOG.error(msg, e);
|
|
|
+ throw new YarnRuntimeException(msg, e);
|
|
|
+ }
|
|
|
+ TimelineDelegationTokenSelector tokenSelector =
|
|
|
+ new TimelineDelegationTokenSelector();
|
|
|
+ return tokenSelector.selectToken(
|
|
|
+ service, ugi.getCredentials().getAllTokens());
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setService(Text service) {
|
|
|
+ this.service = service;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
+ CommandLine cliParser = new GnuParser().parse(opts, argv);
|
|
|
+ if (cliParser.hasOption("put")) {
|
|
|
+ String path = cliParser.getOptionValue("put");
|
|
|
+ if (path != null && path.length() > 0) {
|
|
|
+ putTimelineEntitiesInJSONFile(path);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ printUsage();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Put timeline data in a JSON file via command line.
|
|
|
+ *
|
|
|
+ * @param path
|
|
|
+ * path to the {@link TimelineEntities} JSON file
|
|
|
+ */
|
|
|
+ private static void putTimelineEntitiesInJSONFile(String path) {
|
|
|
+ File jsonFile = new File(path);
|
|
|
+ if (!jsonFile.exists()) {
|
|
|
+ System.out.println("Error: File [" + jsonFile.getAbsolutePath()
|
|
|
+ + "] doesn't exist");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ ObjectMapper mapper = new ObjectMapper();
|
|
|
+ YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
|
|
|
+ TimelineEntities entities = null;
|
|
|
+ try {
|
|
|
+ entities = mapper.readValue(jsonFile, TimelineEntities.class);
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.err.println("Error: " + e.getMessage());
|
|
|
+ e.printStackTrace(System.err);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Configuration conf = new YarnConfiguration();
|
|
|
+ TimelineClient client = TimelineClient.createTimelineClient();
|
|
|
+ client.init(conf);
|
|
|
+ client.start();
|
|
|
+ try {
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()
|
|
|
+ && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
|
|
|
+ Token<TimelineDelegationTokenIdentifier> token =
|
|
|
+ client.getDelegationToken(
|
|
|
+ UserGroupInformation.getCurrentUser().getUserName());
|
|
|
+ UserGroupInformation.getCurrentUser().addToken(token);
|
|
|
+ }
|
|
|
+ TimelinePutResponse response = client.putEntities(
|
|
|
+ entities.getEntities().toArray(
|
|
|
+ new TimelineEntity[entities.getEntities().size()]));
|
|
|
+ if (response.getErrors().size() == 0) {
|
|
|
+ System.out.println("Timeline data is successfully put");
|
|
|
+ } else {
|
|
|
+ for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
|
|
|
+ System.out.println("TimelineEntity [" + error.getEntityType() + ":" +
|
|
|
+ error.getEntityId() + "] is not successfully put. Error code: " +
|
|
|
+ error.getErrorCode());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.err.println("Error: " + e.getMessage());
|
|
|
+ e.printStackTrace(System.err);
|
|
|
+ } finally {
|
|
|
+ client.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper function to print out usage
|
|
|
+ */
|
|
|
+ private static void printUsage() {
|
|
|
+ new HelpFormatter().printHelp("TimelineClient", opts);
|
|
|
+ }
|
|
|
+
|
|
|
}
|