Просмотр исходного кода

HDFS-13955. RBF: Support secure Namenode in NamenodeHeartbeatService. Contributed by CR Hota.

Ayush Saxena 6 лет назад
Родитель
Сommit
f544121239

+ 11 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.VersionInfo;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
@@ -55,9 +57,12 @@ public final class FederationUtil {
    *
    * @param beanQuery JMX bean.
    * @param webAddress Web address of the JMX endpoint.
+   * @param connectionFactory to open http/https connection.
+   * @param scheme to use for URL connection.
    * @return JSON with the JMX data
    */
-  public static JSONArray getJmx(String beanQuery, String webAddress) {
+  public static JSONArray getJmx(String beanQuery, String webAddress,
+      URLConnectionFactory connectionFactory, String scheme) {
     JSONArray ret = null;
     BufferedReader reader = null;
     try {
@@ -68,8 +73,11 @@ public final class FederationUtil {
         host = webAddressSplit[0];
         port = Integer.parseInt(webAddressSplit[1]);
       }
-      URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery);
-      URLConnection conn = jmxURL.openConnection();
+      URL jmxURL = new URL(scheme, host, port, "/jmx?qry=" + beanQuery);
+      LOG.debug("JMX URL: {}", jmxURL);
+      // Create a URL connection
+      URLConnection conn = connectionFactory.openConnection(
+          jmxURL, UserGroupInformation.isSecurityEnabled());
       conn.setConnectTimeout(5 * 1000);
       conn.setReadTimeout(5 * 1000);
       InputStream in = conn.getInputStream();

+ 24 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
 import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
@@ -86,7 +87,10 @@ public class NamenodeHeartbeatService extends PeriodicService {
   private String lifelineAddress;
   /** HTTP address for the namenode. */
   private String webAddress;
-
+  /** Connection factory for JMX calls. */
+  private URLConnectionFactory connectionFactory;
+  /** URL scheme to use for JMX calls. */
+  private String scheme;
   /**
    * Create a new Namenode status updater.
    * @param resolver Namenode resolver service to handle NN registration.
@@ -147,6 +151,12 @@ public class NamenodeHeartbeatService extends PeriodicService {
         DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
     LOG.info("{} Web address: {}", nnDesc, webAddress);
 
+    this.connectionFactory =
+        URLConnectionFactory.newDefaultURLConnectionFactory(conf);
+
+    this.scheme =
+        DFSUtil.getHttpPolicy(conf).isHttpEnabled() ? "http" : "https";
+
     this.setIntervalMs(conf.getLong(
         DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
         DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
@@ -329,7 +339,8 @@ public class NamenodeHeartbeatService extends PeriodicService {
     try {
       // TODO part of this should be moved to its own utility
       String query = "Hadoop:service=NameNode,name=FSNamesystem*";
-      JSONArray aux = FederationUtil.getJmx(query, address);
+      JSONArray aux = FederationUtil.getJmx(
+          query, address, connectionFactory, scheme);
       if (aux != null) {
         for (int i = 0; i < aux.length(); i++) {
           JSONObject jsonObject = aux.getJSONObject(i);
@@ -364,4 +375,14 @@ public class NamenodeHeartbeatService extends PeriodicService {
       LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
     }
   }
-}
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping NamenodeHeartbeat service for, NS {} NN {} ",
+        this.nameserviceId, this.namenodeId);
+    if (this.connectionFactory != null) {
+      this.connectionFactory.destroy();
+    }
+    super.serviceStop();
+  }
+}

+ 50 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
 import static java.util.Arrays.asList;
 import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,6 +33,7 @@ import java.util.TreeSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
 import org.apache.hadoop.hdfs.server.federation.MockNamenode;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@@ -40,8 +42,10 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -264,4 +268,50 @@ public class TestRouterNamenodeMonitoring {
     assertTrue(actualSet + " does not contain all " + expected,
         actualSet.containsAll(expected));
   }
+
+  @Test
+  public void testJmxUrlHTTP() {
+    verifyUrlSchemes(HttpConfig.Policy.HTTP_ONLY.name());
+  }
+
+  @Test
+  public void testJmxUrlHTTPs() {
+    verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name());
+  }
+
+  private void verifyUrlSchemes(String scheme) {
+
+    // Attach our own log appender so we can verify output
+    final LogVerificationAppender appender =
+        new LogVerificationAppender();
+    final org.apache.log4j.Logger logger =
+        org.apache.log4j.Logger.getRootLogger();
+    logger.addAppender(appender);
+    logger.setLevel(Level.DEBUG);
+
+    // Setup and start the Router
+    Configuration conf = getNamenodesConfig();
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, scheme);
+    Configuration routerConf = new RouterConfigBuilder(conf)
+        .heartbeat(true)
+        .build();
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns1.nn0");
+    router = new Router();
+    router.init(routerConf);
+
+    // Test the heartbeat services of the Router
+    Collection<NamenodeHeartbeatService> heartbeatServices =
+        router.getNamenodeHeartbeatServices();
+    for (NamenodeHeartbeatService heartbeatService : heartbeatServices) {
+      heartbeatService.getNamenodeStatusReport();
+    }
+    if (HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme)) {
+      assertEquals(1, appender.countLinesWithMessage("JMX URL: https://"));
+      assertEquals(0, appender.countLinesWithMessage("JMX URL: http://"));
+    } else {
+      assertEquals(1, appender.countLinesWithMessage("JMX URL: http://"));
+      assertEquals(0, appender.countLinesWithMessage("JMX URL: https://"));
+    }
+  }
 }