|
@@ -19,11 +19,14 @@ package org.apache.hadoop.metrics2.sink.timeline;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
import org.codehaus.jackson.map.AnnotationIntrospector;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
import org.codehaus.jackson.map.annotate.JsonSerialize;
|
|
|
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
|
|
|
|
|
|
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
|
import javax.net.ssl.HttpsURLConnection;
|
|
|
import javax.net.ssl.SSLContext;
|
|
|
import javax.net.ssl.SSLSocketFactory;
|
|
@@ -140,9 +143,11 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(String.format("Ignoring %s AMS connection exceptions", NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS));
|
|
|
}
|
|
|
- return false;
|
|
|
}
|
|
|
+ } catch (AuthenticationException e) {
|
|
|
+ LOG.error("AuthenticationException while posting metrics to metrics collector", e);
|
|
|
}
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
protected boolean emitMetrics(TimelineMetrics metrics) {
|
|
@@ -186,18 +191,43 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
}
|
|
|
return sb.toString();
|
|
|
}
|
|
|
+ /**
|
|
|
+ * Uses UGI to find and renew (if needed) kerberos ticket and opens authenticated connection
|
|
|
+ *
|
|
|
+ * @param spec - requested URL
|
|
|
+ * @param userGroupInformation
|
|
|
+ * @return HttpURLConnection - opened authenticated connection
|
|
|
+ * @throws IOException
|
|
|
+ * @throws AuthenticationException
|
|
|
+ */
|
|
|
+ protected HttpURLConnection getAuthenticatedConnection(String spec, UserGroupInformation userGroupInformation) throws IOException, AuthenticationException {
|
|
|
+ //renew ticked if needed
|
|
|
+ if (userGroupInformation != null) {
|
|
|
+ userGroupInformation.checkTGTAndReloginFromKeytab();
|
|
|
+ }
|
|
|
+ //open authenticated connection
|
|
|
+ AuthenticatedURL.Token token = new AuthenticatedURL.Token();
|
|
|
+ URL url = new URL(spec);
|
|
|
+ return new AuthenticatedURL().openConnection(url, token);
|
|
|
+ }
|
|
|
|
|
|
// Get a connection
|
|
|
- protected HttpURLConnection getConnection(String spec) throws IOException {
|
|
|
- return (HttpURLConnection) new URL(spec).openConnection();
|
|
|
+ protected HttpURLConnection getConnection(String spec) throws IOException, AuthenticationException {
|
|
|
+ UserGroupInformation userGroupInformation = UserGroupInformation.getLoginUser();
|
|
|
+
|
|
|
+ if (UserGroupInformation.isSecurityEnabled() && userGroupInformation != null &&
|
|
|
+ userGroupInformation.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS) {
|
|
|
+ return getAuthenticatedConnection(spec, userGroupInformation);
|
|
|
+ } else {
|
|
|
+ return (HttpURLConnection) new URL(spec).openConnection();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Get an ssl connection
|
|
|
protected HttpsURLConnection getSSLConnection(String spec)
|
|
|
- throws IOException, IllegalStateException {
|
|
|
+ throws IOException, IllegalStateException, AuthenticationException {
|
|
|
|
|
|
- HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec)
|
|
|
- .openConnection());
|
|
|
+ HttpsURLConnection connection = (HttpsURLConnection) getConnection(spec);
|
|
|
|
|
|
connection.setSSLSocketFactory(sslSocketFactory);
|
|
|
|