浏览代码

AMBARI-9289. RU - Fix hardcoded Namenode address and JMX port (alejandro)

Alejandro Fernandez 10 年之前
父节点
当前提交
714a006d86
共有 23 个文件被更改,包括 913 次插入358 次删除
  1. 1 1
      ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
  2. 2 1
      ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java
  3. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/checks/HostsMasterMaintenanceCheck.java
  4. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
  5. 119 64
      ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
  6. 17 2
      ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
  7. 18 0
      ambari-server/src/main/java/org/apache/ambari/server/utils/HTTPUtils.java
  8. 28 0
      ambari-server/src/main/java/org/apache/ambari/server/utils/HostAndPort.java
  9. 12 13
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
  10. 1 1
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
  11. 178 0
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
  12. 9 10
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
  13. 10 4
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params.py
  14. 10 0
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
  15. 1 1
      ambari-server/src/test/java/org/apache/ambari/server/state/CheckHelperTest.java
  16. 52 0
      ambari-server/src/test/java/org/apache/ambari/server/utils/TestHTTPUtils.java
  17. 15 8
      ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py
  18. 150 57
      ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-hdfs-secure.json
  19. 1 1
      ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-jmx.json
  20. 117 136
      ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-namenode-jmx.json
  21. 10 0
      ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-namenode-status-active.json
  22. 10 0
      ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-namenode-status-standby.json
  23. 150 57
      ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade.json

+ 1 - 1
ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py

@@ -76,7 +76,7 @@ class UnknownConfiguration():
     self.name = name
    
   def __getattr__(self, name):
-    raise Fail("Configuration parameter '"+self.name+"' was not found in configurations dictionary!")
+    raise Fail("Configuration parameter '" + self.name + "' was not found in configurations dictionary!")
   
   def __getitem__(self, name):
     """

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java

@@ -25,6 +25,7 @@ import org.apache.ambari.server.controller.PrereqCheckRequest;
 import org.apache.ambari.server.orm.dao.HostVersionDAO;
 import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.stack.PrerequisiteCheck;
 import org.apache.ambari.server.state.stack.PrereqCheckType;
 
@@ -41,7 +42,7 @@ public abstract class AbstractCheckDescriptor {
   Provider<Clusters> clustersProvider;
 
   @Inject
-  Provider<Configuration> configurationProvider;
+  Provider<ConfigHelper> configHelperProvider;
 
   @Inject
   Provider<HostVersionDAO> hostVersionDaoProvider;

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/checks/HostsMasterMaintenanceCheck.java

@@ -46,7 +46,7 @@ public class HostsMasterMaintenanceCheck extends AbstractCheckDescriptor {
   public void perform(PrerequisiteCheck prerequisiteCheck, PrereqCheckRequest request) throws AmbariException {
     final String clusterName = request.getClusterName();
     final Cluster cluster = clustersProvider.get().getCluster(clusterName);
-    final MasterHostResolver masterHostResolver = new MasterHostResolver(cluster);
+    final MasterHostResolver masterHostResolver = new MasterHostResolver(configHelperProvider.get(), cluster);
     final Set<String> hostsWithMasterComponent = new HashSet<String>();
     for (Map.Entry<String, Service> serviceEntry: cluster.getServices().entrySet()) {
       final Service service = serviceEntry.getValue();

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java

@@ -416,7 +416,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     final String version = (String) requestMap.get(UPGRADE_VERSION);
 
     MasterHostResolver resolver = Direction.UPGRADE == direction ?
-        new MasterHostResolver(cluster) : new MasterHostResolver(cluster, version);
+        new MasterHostResolver(configHelper, cluster) : new MasterHostResolver(configHelper, cluster, version);
 
     UpgradeContext ctx = new UpgradeContext(resolver, version, direction);
 

+ 119 - 64
ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java

@@ -19,6 +19,8 @@
 package org.apache.ambari.server.stack;
 
 import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
@@ -28,10 +30,12 @@ import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.utils.HTTPUtils;
+import org.apache.ambari.server.utils.HostAndPort;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +49,7 @@ public class MasterHostResolver {
 
   private Cluster m_cluster;
   private String m_version;
+  private ConfigHelper m_configHelper;
 
   enum Service {
     HDFS,
@@ -65,11 +70,11 @@ public class MasterHostResolver {
    * Create a resolver that does not consider HostComponents' version when
    * resolving hosts.  Common use case is creating an upgrade that should
    * include an entire cluster.
-   *
+   * @param configHelper Configuration Helper
    * @param cluster the cluster
    */
-  public MasterHostResolver(Cluster cluster) {
-    this(cluster, null);
+  public MasterHostResolver(ConfigHelper configHelper, Cluster cluster) {
+    this(configHelper, cluster, null);
   }
 
   /**
@@ -77,17 +82,18 @@ public class MasterHostResolver {
    * hosts for the stage.  Common use case is for downgrades when only some
    * HostComponents need to be downgraded, and HostComponents already at the
    * correct version are skipped.
-   *
+   * @param configHelper Configuration Helper
    * @param cluster the cluster
    * @param version the version, or {@code null} to not compare versions
    */
-  public MasterHostResolver(Cluster cluster, String version) {
+  public MasterHostResolver(ConfigHelper configHelper, Cluster cluster, String version) {
+    m_configHelper = configHelper;
     m_cluster = cluster;
     m_version = version;
   }
 
   /**
-   * Gets the cluster that this instace of the {@link MasterHostResolver} is
+   * Gets the cluster that this instance of the {@link MasterHostResolver} is
    * initialized with.
    *
    * @return the cluster (not {@code null}).
@@ -123,37 +129,45 @@ public class MasterHostResolver {
       // !!! nothing to do
     }
 
-    switch (s) {
-      case HDFS:
-        if (componentName.equalsIgnoreCase("NAMENODE")) {
-          Map<Status, String> pair = getNameNodePair(componentHosts);
-          if (pair != null) {
-            hostsType.master = pair.containsKey(Status.ACTIVE) ? pair.get(Status.ACTIVE) :  null;
-            hostsType.secondary = pair.containsKey(Status.STANDBY) ? pair.get(Status.STANDBY) :  null;
+    try {
+      switch (s) {
+        case HDFS:
+          if (componentName.equalsIgnoreCase("NAMENODE")) {
+            if (componentHosts.size() != 2) {
+              return hostsType;
+            }
+
+            Map<Status, String> pair = getNameNodePair();
+            if (pair != null) {
+              hostsType.master = pair.containsKey(Status.ACTIVE) ? pair.get(Status.ACTIVE) :  null;
+              hostsType.secondary = pair.containsKey(Status.STANDBY) ? pair.get(Status.STANDBY) :  null;
+            } else {
+              hostsType.master = componentHosts.iterator().next();
+            }
           } else {
-            hostsType.master = componentHosts.iterator().next();
+            hostsType = filterSameVersion(hostsType, serviceName, componentName);
           }
-        } else {
-          hostsType = filterSameVersion(hostsType, serviceName, componentName);
-        }
-        break;
-      case YARN:
-        if (componentName.equalsIgnoreCase("RESOURCEMANAGER")) {
-          resolveResourceManagers(hostsType);
-        } else {
-          hostsType = filterSameVersion(hostsType, serviceName, componentName);
-        }
-        break;
-      case HBASE:
-        if (componentName.equalsIgnoreCase("HBASE_MASTER")) {
-          resolveHBaseMasters(hostsType);
-        } else {
+          break;
+        case YARN:
+          if (componentName.equalsIgnoreCase("RESOURCEMANAGER")) {
+            resolveResourceManagers(getCluster(), hostsType);
+          } else {
+            hostsType = filterSameVersion(hostsType, serviceName, componentName);
+          }
+          break;
+        case HBASE:
+          if (componentName.equalsIgnoreCase("HBASE_MASTER")) {
+            resolveHBaseMasters(getCluster(), hostsType);
+          } else {
+            hostsType = filterSameVersion(hostsType, serviceName, componentName);
+          }
+          break;
+        case OTHER:
           hostsType = filterSameVersion(hostsType, serviceName, componentName);
-        }
-        break;
-      case OTHER:
-        hostsType = filterSameVersion(hostsType, serviceName, componentName);
-        break;
+          break;
+      }
+    } catch (Exception err) {
+      LOG.error("Unable to get master and hosts for Component " + componentName + ". Error: " + err.getMessage(), err);
     }
     return hostsType;
   }
@@ -202,40 +216,72 @@ public class MasterHostResolver {
 
   /**
    * Get mapping of the HDFS Namenodes from the state ("active" or "standby") to the hostname.
-   * @param hosts Hosts to lookup.
-   * @return Returns a map from the state ("active" or "standby" to the hostname with that state.
+   * @return Returns a map from the state ("active" or "standby" to the hostname with that state if exactly
+   * one active and one standby host were found, otherwise, return null.
    */
-  private Map<Status, String> getNameNodePair(Set<String> hosts) {
+  private Map<Status, String> getNameNodePair() {
     Map<Status, String> stateToHost = new HashMap<Status, String>();
+    Cluster cluster = getCluster();
 
-    if (hosts != null && hosts.size() == 2) {
-      for (String hostname : hosts) {
-        String state = queryJmxBeanValue(hostname, 50070,
-            "Hadoop:service=NameNode,name=NameNodeStatus", "State", true);
+    String nameService = m_configHelper.getValueFromDesiredConfigurations(cluster, ConfigHelper.HDFS_SITE, "dfs.nameservices");
+    if (nameService == null || nameService.isEmpty()) {
+      return null;
+    }
 
-        if (null != state &&
-            (state.equalsIgnoreCase(Status.ACTIVE.toString()) ||
-                state.equalsIgnoreCase(Status.STANDBY.toString()))) {
-            Status status = Status.valueOf(state.toUpperCase());
-            stateToHost.put(status, hostname);
-          }
+    String nnUniqueIDstring = m_configHelper.getValueFromDesiredConfigurations(cluster, ConfigHelper.HDFS_SITE, "dfs.ha.namenodes." + nameService);
+    if (nnUniqueIDstring == null || nnUniqueIDstring.isEmpty()) {
+      return null;
+    }
+
+    String[] nnUniqueIDs = nnUniqueIDstring.split(",");
+    if (nnUniqueIDs == null || nnUniqueIDs.length != 2) {
+      return null;
+    }
+
+    String policy = m_configHelper.getValueFromDesiredConfigurations(cluster, ConfigHelper.HDFS_SITE, "dfs.http.policy");
+    boolean encrypted = (policy != null && policy.equalsIgnoreCase(ConfigHelper.HTTPS_ONLY));
+
+    String namenodeFragment = "dfs.namenode." + (encrypted ? "https-address" : "http-address") + ".{0}.{1}";
+
+    for (String nnUniqueID : nnUniqueIDs) {
+      String key = MessageFormat.format(namenodeFragment, nameService, nnUniqueID);
+      String value = m_configHelper.getValueFromDesiredConfigurations(cluster, ConfigHelper.HDFS_SITE, key);
+
+      try {
+        HostAndPort hp = HTTPUtils.getHostAndPortFromProperty(value);
+        if (hp == null) {
+          throw new MalformedURLException("Could not parse host and port from " + value);
         }
 
-      if (stateToHost.containsKey(Status.ACTIVE) && stateToHost.containsKey(Status.STANDBY) && !stateToHost.get(Status.ACTIVE).equalsIgnoreCase(stateToHost.get(Status.STANDBY))) {
-        return stateToHost;
+        String state = queryJmxBeanValue(hp.host, hp.port, "Hadoop:service=NameNode,name=NameNodeStatus", "State", true, encrypted);
+
+        if (null != state && (state.equalsIgnoreCase(Status.ACTIVE.toString()) || state.equalsIgnoreCase(Status.STANDBY.toString()))) {
+          Status status = Status.valueOf(state.toUpperCase());
+          stateToHost.put(status, hp.host);
+        }
+      } catch (MalformedURLException e) {
+        LOG.error(e.getMessage());
       }
     }
 
+    if (stateToHost.containsKey(Status.ACTIVE) && stateToHost.containsKey(Status.STANDBY) && !stateToHost.get(Status.ACTIVE).equalsIgnoreCase(stateToHost.get(Status.STANDBY))) {
+      return stateToHost;
+    }
     return null;
   }
 
-  private void resolveResourceManagers(HostsType hostType) {
-    // !!! for RM, only the master returns jmx
+  private void resolveResourceManagers(Cluster cluster, HostsType hostType) throws MalformedURLException {
     Set<String> orderedHosts = new LinkedHashSet<String>(hostType.hosts);
 
-    for (String hostname : hostType.hosts) {
+    // IMPORTANT, for RM, only the master returns jmx
+    String rmWebAppAddress = m_configHelper.getValueFromDesiredConfigurations(cluster, ConfigHelper.YARN_SITE, "yarn.resourcemanager.webapp.address");
+    HostAndPort hp = HTTPUtils.getHostAndPortFromProperty(rmWebAppAddress);
+    if (hp == null) {
+      throw new MalformedURLException("Could not parse host and port from " + rmWebAppAddress);
+    }
 
-      String value = queryJmxBeanValue(hostname, 8088,
+    for (String hostname : hostType.hosts) {
+      String value = queryJmxBeanValue(hostname, hp.port,
           "Hadoop:service=ResourceManager,name=RMNMInfo", "modelerType", true);
 
       if (null != value) {
@@ -243,7 +289,7 @@ public class MasterHostResolver {
           hostType.master = hostname;
         }
 
-        // !!! quick and dirty to make sure the master is last in the list
+        // Quick and dirty to make sure the master is last in the list
         orderedHosts.remove(hostname);
         orderedHosts.add(hostname);
       }
@@ -252,11 +298,17 @@ public class MasterHostResolver {
     hostType.hosts = orderedHosts;
   }
 
-  private void resolveHBaseMasters(HostsType hostsType) {
+  private void resolveHBaseMasters(Cluster cluster, HostsType hostsType) throws AmbariException {
+    String hbaseMasterInfoPortProperty = "hbase.master.info.port";
+    String hbaseMasterInfoPortValue = m_configHelper.getValueFromDesiredConfigurations(cluster, ConfigHelper.HBASE_SITE, hbaseMasterInfoPortProperty);
 
-    for (String hostname : hostsType.hosts) {
+    if (hbaseMasterInfoPortValue == null || hbaseMasterInfoPortValue.isEmpty()) {
+      throw new AmbariException("Could not find property " + hbaseMasterInfoPortProperty);
+    }
 
-      String value = queryJmxBeanValue(hostname, 60010,
+    final int hbaseMasterInfoPort = Integer.parseInt(hbaseMasterInfoPortValue);
+    for (String hostname : hostsType.hosts) {
+      String value = queryJmxBeanValue(hostname, hbaseMasterInfoPort,
           "Hadoop:service=HBase,name=Master,sub=Server", "tag.isActiveMaster", false);
 
       if (null != value) {
@@ -272,11 +324,17 @@ public class MasterHostResolver {
   }
 
   private String queryJmxBeanValue(String hostname, int port, String beanName, String attributeName,
-      boolean asQuery) {
+                                   boolean asQuery) {
+    return queryJmxBeanValue(hostname, port, beanName, attributeName, asQuery, false);
+  }
+
+  private String queryJmxBeanValue(String hostname, int port, String beanName, String attributeName,
+      boolean asQuery, boolean encrypted) {
 
-    String endPoint = asQuery ?
-        String.format("http://%s:%s/jmx?qry=%s", hostname, port, beanName) :
-          String.format("http://%s:%s/jmx?get=%s::%s", hostname, port, beanName, attributeName);
+    String protocol = encrypted ? "https://" : "http://";
+    String endPoint = protocol + (asQuery ?
+        String.format("%s:%s/jmx?qry=%s", hostname, port, beanName) :
+        String.format("%s:%s/jmx?get=%s::%s", hostname, port, beanName, attributeName));
 
     String response = HTTPUtils.requestURL(endPoint);
 
@@ -296,8 +354,5 @@ public class MasterHostResolver {
     }
 
     return null;
-
   }
-
-
 }

+ 17 - 2
ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java

@@ -68,6 +68,17 @@ public class ConfigHelper {
   private static final Logger LOG =
       LoggerFactory.getLogger(ConfigHelper.class);
 
+  /**
+   * List of property prefixes and names. Please keep in alphabetical order.
+   */
+  public static final String HBASE_SITE = "hbase-site";
+  public static final String HDFS_SITE = "hdfs-site";
+  public static final String HIVE_SITE = "hive-site";
+  public static final String YARN_SITE = "yarn-site";
+
+  public static final String HTTP_ONLY = "HTTP_ONLY";
+  public static final String HTTPS_ONLY = "HTTPS_ONLY";
+
   @Inject
   public ConfigHelper(Clusters c, AmbariMetaInfo metaInfo, Configuration configuration, ClusterDAO clusterDAO) {
     clusters = c;
@@ -530,6 +541,11 @@ public class ConfigHelper {
         placeholder.length());
 
     // return the value if it exists, otherwise return the placeholder
+    String value = getValueFromDesiredConfigurations(cluster, configType, propertyName);
+    return value != null ? value : placeholder;
+  }
+
+  public String getValueFromDesiredConfigurations(Cluster cluster, String configType, String propertyName) {
     Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
     DesiredConfig desiredConfig = desiredConfigs.get(configType);
     Config config = cluster.getConfig(configType, desiredConfig.getTag());
@@ -540,8 +556,7 @@ public class ConfigHelper {
         return value;
       }
     }
-
-    return placeholder;
+    return null;
   }
 
   public ServiceInfo getPropertyOwnerService(Cluster cluster, String configType, String propertyName) throws AmbariException {

+ 18 - 0
ambari-server/src/main/java/org/apache/ambari/server/utils/HTTPUtils.java

@@ -77,4 +77,22 @@ public class HTTPUtils {
     }
     return result;
   }
+
+  /**
+   * Given a property like 0.0.0.0:1234 or c6401.ambari.apache.org:50070
+   * will extract the hostname and the port number
+   * @param value Address
+   * @return Return the host and port if it is a valid string, otherwise null.
+   */
+  public static HostAndPort getHostAndPortFromProperty(String value) {
+    if (value == null || value.isEmpty()) return null;
+    value = value.trim();
+    int colonIndex = value.indexOf(":");
+    if (colonIndex > 0 && colonIndex < value.length() - 1) {
+      String host = value.substring(0, colonIndex);
+      int port = Integer.parseInt(value.substring(colonIndex + 1, value.length())); // account for the ":"
+      return new HostAndPort(host, port);
+    }
+    return null;
+  }
 }

+ 28 - 0
ambari-server/src/main/java/org/apache/ambari/server/utils/HostAndPort.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+public class HostAndPort {
+  public String host;
+  public int port;
+
+  public HostAndPort(String host, int port) {
+    this.host = host;
+    this.port = port;
+  }
+}

+ 12 - 13
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py

@@ -24,7 +24,7 @@ from resource_management.core.resources.system import Execute
 from resource_management.libraries.functions.default import default
 from resource_management.core.exceptions import Fail
 from utils import get_jmx_data
-
+from namenode_ha_state import NAMENODE_STATE, NamenodeHAState
 
 
 def post_upgrade_check():
@@ -35,6 +35,9 @@ def post_upgrade_check():
   import params
   Logger.info("Ensuring Journalnode quorum is established")
 
+  if params.security_enabled:
+    Execute(params.jn_kinit_cmd, user=params.hdfs_user)
+
   time.sleep(5)
   hdfs_roll_edits()
   time.sleep(5)
@@ -44,18 +47,16 @@ def post_upgrade_check():
   if len(all_journal_node_hosts) < 3:
     raise Fail("Need at least 3 Journalnodes to maintain a quorum")
 
-  # TODO, test with HTTPS
-  policy = default("/configurations/hdfs-site/dfs.http.policy", "HTTP_ONLY")
-  encrypted = policy.upper == "HTTPS_ONLY"
-
-  nn_address = default("/configurations/hdfs-site/dfs.namenode.https-address", None) if encrypted else \
-    default("/configurations/hdfs-site/dfs.namenode.http-address", None)
+  try:
+    namenode_ha = NamenodeHAState()
+  except ValueError, err:
+    raise Fail("Could not retrieve Namenode HA addresses. Error: " + str(err))
 
-  if not nn_address:
-    raise Fail("Could not retrieve dfs.namenode.http(s)-address for policy %s" % str(policy))
+  Logger.info(str(namenode_ha))
+  nn_address = namenode_ha.get_address(NAMENODE_STATE.ACTIVE)
 
   nn_data = get_jmx_data(nn_address, 'org.apache.hadoop.hdfs.server.namenode.FSNamesystem', 'JournalTransactionInfo',
-                         encrypted)
+                         namenode_ha.is_encrypted())
   if not nn_data:
     raise Fail("Could not retrieve JournalTransactionInfo from JMX")
 
@@ -73,13 +74,11 @@ def hdfs_roll_edits():
   """
   HDFS_CLIENT needs to be a dependency of JOURNALNODE
   Roll the logs so that Namenode will be able to connect to the Journalnode.
+  Must kinit before calling this command.
   """
   import params
 
   # TODO, this will be to be doc'ed since existing HDP 2.2 clusters will needs HDFS_CLIENT on all JOURNALNODE hosts
-  if params.security_enabled:
-    Execute(params.dn_kinit_cmd, user=params.hdfs_user)
-
   command = 'hdfs dfsadmin -rollEdits'
   Execute(command, user=params.hdfs_user, tries=1)
 

+ 1 - 1
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py

@@ -79,7 +79,7 @@ class NameNode(Script):
     env.set_params(params)
 
     Execute("hdfs dfsadmin -report -live",
-            user=params.hdfs_principal_name if params.security_enabled else params.hdfs_user
+            user=params.hdfs_user
     )
 
   def stop(self, env, rolling_restart=False):

+ 178 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py

@@ -0,0 +1,178 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.default import default
+from utils import get_value_from_jmx
+
+
+class NAMENODE_STATE:
+  ACTIVE = "active"
+  STANDBY = "standby"
+  UNKNOWN = "unknown"
+
+
+class NamenodeHAState:
+  """
+  Represents the current state of the Namenode Hosts in High Availability Mode
+  """
+
+  def __init__(self):
+    """
+    Initializes all fields by querying the Namenode state.
+    Raises a ValueError if unable to construct the object.
+    """
+    import params
+
+    self.name_service = default("/configurations/hdfs-site/dfs.nameservices", None)
+    if not self.name_service:
+      raise ValueError("Could not retrieve property dfs.nameservices")
+
+    nn_unique_ids_key = "dfs.ha.namenodes." + str(self.name_service)
+    # List of the nn unique ids
+    self.nn_unique_ids = default("/configurations/hdfs-site/" + nn_unique_ids_key, None)
+    if not self.nn_unique_ids:
+      raise ValueError("Could not retrieve property " + nn_unique_ids_key)
+
+    self.nn_unique_ids = self.nn_unique_ids.split(",")
+    self.nn_unique_ids = [x.strip() for x in self.nn_unique_ids]
+
+    policy = default("/configurations/hdfs-site/dfs.http.policy", "HTTP_ONLY")
+    self.encrypted = policy.upper() == "HTTPS_ONLY"
+
+    jmx_uri_fragment = ("https" if self.encrypted else "http") + "://{0}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
+    namenode_http_fragment = "dfs.namenode.http-address.{0}.{1}"
+    namenode_https_fragment = "dfs.namenode.https-address.{0}.{1}"
+
+    # Dictionary where the key is the Namenode State (e.g., ACTIVE), and the value is a set of hostnames
+    self.namenode_state_to_hostnames = {}
+
+    # Dictionary from nn unique id name to a tuple of (http address, https address)
+    self.nn_unique_id_to_addresses = {}
+    for nn_unique_id in self.nn_unique_ids:
+      http_key = namenode_http_fragment.format(self.name_service, nn_unique_id)
+      https_key = namenode_https_fragment.format(self.name_service, nn_unique_id)
+
+      http_value = default("/configurations/hdfs-site/" + http_key, None)
+      https_value = default("/configurations/hdfs-site/" + https_key, None)
+      actual_value = https_value if self.encrypted else http_value
+      hostname = actual_value.split(":")[0].strip() if actual_value and ":" in actual_value else None
+
+      self.nn_unique_id_to_addresses[nn_unique_id] = (http_value, https_value)
+      try:
+        if not hostname:
+          raise Exception("Could not retrieve hostname from address " + actual_value)
+
+        jmx_uri = jmx_uri_fragment.format(actual_value)
+        state = get_value_from_jmx(jmx_uri, "State")
+
+        if not state:
+          raise Exception("Could not retrieve Namenode state from URL " + jmx_uri)
+
+        state = state.lower()
+
+        if state not in [NAMENODE_STATE.ACTIVE, NAMENODE_STATE.STANDBY]:
+          state = NAMENODE_STATE.UNKNOWN
+
+        if state in self.namenode_state_to_hostnames:
+          self.namenode_state_to_hostnames[state].add(hostname)
+        else:
+          hostnames = set([hostname, ])
+          self.namenode_state_to_hostnames[state] = hostnames
+      except:
+        Logger.error("Could not get namenode state for " + nn_unique_id)
+
+  def __str__(self):
+    return "Namenode HA State: {\n" + \
+           ("IDs: %s\n"       % ", ".join(self.nn_unique_ids)) + \
+           ("Addresses: %s\n" % str(self.nn_unique_id_to_addresses)) + \
+           ("States: %s\n"    % str(self.namenode_state_to_hostnames)) + \
+           ("Encrypted: %s\n" % str(self.encrypted)) + \
+           ("Healthy: %s\n"   % str(self.is_healthy())) + \
+           "}"
+
+  def is_encrypted(self):
+    """
+    :return: Returns a bool indicating if HTTPS is enabled
+    """
+    return self.encrypted
+
+  def get_nn_unique_ids(self):
+    """
+    :return Returns a list of the nn unique ids
+    """
+    return self.nn_unique_ids
+
+  def get_nn_unique_id_to_addresses(self):
+    """
+    :return Returns a dictionary where the key is the nn unique id, and the value is a tuple of (http address, https address)
+    Each address is of the form, hostname:port
+    """
+    return self.nn_unique_id_to_addresses
+
+  def get_address_for_nn_id(self, id):
+    """
+    :param id: Namenode ID
+    :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given namenode id.
+    """
+    if id in self.nn_unique_id_to_addresses:
+      addresses = self.nn_unique_id_to_addresses[id]
+      if addresses and len(addresses) == 2:
+        return addresses[1] if self.encrypted else addresses[0]
+    return None
+
+  def get_address_for_host(self, hostname):
+    """
+    :param hostname: Host name
+    :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given host.
+    """
+    for id, addresses in self.nn_unique_id_to_addresses.iteritems():
+      if addresses and len(addresses) == 2:
+        if ":" in addresses[0]:
+          nn_hostname = addresses[0].split(":")[0].strip()
+          if nn_hostname == hostname:
+            # Found the host
+            return addresses[1] if self.encrypted else addresses[0]
+    return None
+
+  def get_namenode_state_to_hostnames(self):
+    """
+    :return Return a dictionary where the key is a member of NAMENODE_STATE, and the value is a set of hostnames.
+    """
+    return self.namenode_state_to_hostnames
+
+  def get_address(self, namenode_state):
+    """
+    @param namenode_state: Member of NAMENODE_STATE
+    :return Get the address that corresponds to the first host with the given state
+    """
+    hosts = self.namenode_state_to_hostnames[namenode_state] if namenode_state in self.namenode_state_to_hostnames else []
+    if hosts and len(hosts) > 0:
+      hostname = list(hosts)[0]
+      return self.get_address_for_host(hostname)
+    return None
+
+  def is_healthy(self):
+    """
+    :return: Returns a bool indicating if exactly one ACTIVE and one STANDBY host exist.
+    """
+    active_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.ACTIVE] if NAMENODE_STATE.ACTIVE in self.namenode_state_to_hostnames else []
+    standby_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.STANDBY] if NAMENODE_STATE.STANDBY in self.namenode_state_to_hostnames else []
+    return len(active_hosts) == 1 and len(standby_hosts) == 1

+ 9 - 10
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py

@@ -74,23 +74,20 @@ def prepare_rolling_upgrade():
   Logger.info("Executing Rolling Upgrade prepare")
   import params
 
-  user = params.hdfs_principal_name if params.security_enabled else params.hdfs_user
-
   if params.security_enabled:
-    Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
-            user=user)
+    Execute(format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}"))
 
-  safemode_transition_successful = reach_safemode_state(user, SAFEMODE.OFF, True)
+  safemode_transition_successful = reach_safemode_state(params.hdfs_user, SAFEMODE.OFF, True)
   if not safemode_transition_successful:
     raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SAFEMODE.OFF))
 
   prepare = "hdfs dfsadmin -rollingUpgrade prepare"
   query = "hdfs dfsadmin -rollingUpgrade query"
   Execute(prepare,
-          user=user,
+          user=params.hdfs_user,
           logoutput=True)
   Execute(query,
-          user=user,
+          user=params.hdfs_user,
           logoutput=True)
 
 
@@ -101,12 +98,14 @@ def finalize_rolling_upgrade():
   Logger.info("Executing Rolling Upgrade finalize")
   import params
 
-  user = params.hdfs_principal_name if params.security_enabled else params.hdfs_user
+  if params.security_enabled:
+    Execute(format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}"))
+
   finalize_cmd = "hdfs dfsadmin -rollingUpgrade finalize"
   Execute(finalize_cmd,
-          user=user,
+          user=params.hdfs_user,
           logoutput=True)
 
-  safemode_transition_successful = reach_safemode_state(user, SAFEMODE.OFF, True)
+  safemode_transition_successful = reach_safemode_state(params.hdfs_user, SAFEMODE.OFF, True)
   if not safemode_transition_successful:
     Logger.warning("Could leave safemode")

+ 10 - 4
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params.py

@@ -151,7 +151,7 @@ hive_user = config['configurations']['hive-env']['hive_user']
 smoke_user =  config['configurations']['cluster-env']['smokeuser']
 smokeuser_principal =  config['configurations']['cluster-env']['smokeuser_principal_name']
 mapred_user = config['configurations']['mapred-env']['mapred_user']
-hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name']
+hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
 
 user_group = config['configurations']['cluster-env']['user_group']
 proxyuser_group =  config['configurations']['hadoop-env']['proxyuser_group']
@@ -209,7 +209,7 @@ if dfs_ha_enabled:
       namenode_id = nn_id
       namenode_rpc = nn_host
 
-if dfs_http_policy == "HTTPS_ONLY":
+if dfs_http_policy is not None and dfs_http_policy.upper() == "HTTPS_ONLY":
   https_only = True
   journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.https-address', None)
 else:
@@ -231,10 +231,16 @@ if security_enabled:
   _nn_keytab = config['configurations']['hdfs-site']['dfs.namenode.keytab.file']
   _nn_principal_name = _nn_principal_name.replace('_HOST',hostname.lower())
   
-  nn_kinit_cmd = format("{kinit_path_local} -kt {_nn_keytab} {_nn_principal_name};")  
+  nn_kinit_cmd = format("{kinit_path_local} -kt {_nn_keytab} {_nn_principal_name};")
+
+  _jn_principal_name = config['configurations']['hdfs-site']['dfs.journalnode.kerberos.principal']
+  _jn_principal_name = _jn_principal_name.replace('_HOST', hostname.lower())
+  _jn_keytab = config['configurations']['hdfs-site']['dfs.journalnode.keytab.file']
+  jn_kinit_cmd = format("{kinit_path_local} -kt {_jn_keytab} {_jn_principal_name};")
 else:
   dn_kinit_cmd = ""
-  nn_kinit_cmd = ""  
+  nn_kinit_cmd = ""
+  jn_kinit_cmd = ""
 
 import functools
 #create partial functions with common arguments for every HdfsDirectory call

+ 10 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py

@@ -218,6 +218,16 @@ def service(action=None, name=None, user=None, options="", create_pid_dir=False,
     )
 
 
+def get_value_from_jmx(qry, property):
+  try:
+    response = urllib2.urlopen(qry)
+    data = response.read()
+    if data:
+      data_dict = json.loads(data)
+      return data_dict["beans"][0][property]
+  except:
+    return None
+
 def get_jmx_data(nn_address, modeler_type, metric, encrypted=False):
   """
   :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already.

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/state/CheckHelperTest.java

@@ -115,7 +115,7 @@ public class CheckHelperTest {
       @Override
       protected void configure() {
         bind(Clusters.class).toInstance(clusters);
-        bind(Configuration.class).toProvider(Providers.<Configuration>of(null));
+        bind(ConfigHelper.class).toProvider(Providers.<ConfigHelper>of(null));
         bind(HostVersionDAO.class).toProvider(Providers.<HostVersionDAO>of(null));
         bind(RepositoryVersionDAO.class).toProvider(Providers.<RepositoryVersionDAO>of(null));
       }

+ 52 - 0
ambari-server/src/test/java/org/apache/ambari/server/utils/TestHTTPUtils.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class TestHTTPUtils {
+
+  @Test
+  public void testGetHostAndPortFromProperty() {
+    String value = null;
+    HostAndPort hp = HTTPUtils.getHostAndPortFromProperty(value);
+    Assert.assertNull(hp);
+
+    value = "";
+    hp = HTTPUtils.getHostAndPortFromProperty(value);
+    Assert.assertNull(hp);
+
+    value = "c6401.ambari.apache.org";
+    hp = HTTPUtils.getHostAndPortFromProperty(value);
+    Assert.assertNull(hp);
+
+    value = "c6401.ambari.apache.org:";
+    hp = HTTPUtils.getHostAndPortFromProperty(value);
+    Assert.assertNull(hp);
+
+    value = "c6401.ambari.apache.org:50070";
+    hp = HTTPUtils.getHostAndPortFromProperty(value);
+    Assert.assertEquals(hp.host, "c6401.ambari.apache.org");
+    Assert.assertEquals(hp.port, 50070);
+
+    value = "  c6401.ambari.apache.org:50070   ";
+    Assert.assertEquals(hp.host, "c6401.ambari.apache.org");
+    Assert.assertEquals(hp.port, 50070);
+  }
+}

+ 15 - 8
ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py

@@ -253,18 +253,26 @@ class TestJournalnode(RMFTestCase):
   def test_post_rolling_restart(self, urlopen_mock, time_mock):
     # load the NN and JN JMX files so that the urllib2.urlopen mock has data
     # to return
+    num_journalnodes = 3
     journalnode_jmx_file = os.path.join(RMFTestCase._getStackTestsFolder(),
-      self.UPGRADE_STACK_VERSION, "configs", "journalnode-upgrade-jmx.json" )
+      self.UPGRADE_STACK_VERSION, "configs", "journalnode-upgrade-jmx.json")
 
     namenode_jmx_file = os.path.join(RMFTestCase._getStackTestsFolder(),
-      self.UPGRADE_STACK_VERSION, "configs", "journalnode-upgrade-namenode-jmx.json" )
+      self.UPGRADE_STACK_VERSION, "configs", "journalnode-upgrade-namenode-jmx.json")
+
+    namenode_status_active_file = os.path.join(RMFTestCase._getStackTestsFolder(),
+      self.UPGRADE_STACK_VERSION, "configs", "journalnode-upgrade-namenode-status-active.json")
+
+    namenode_status_standby_file = os.path.join(RMFTestCase._getStackTestsFolder(),
+      self.UPGRADE_STACK_VERSION, "configs", "journalnode-upgrade-namenode-status-standby.json")
 
     journalnode_jmx = open(journalnode_jmx_file, 'r').read()
     namenode_jmx = open(namenode_jmx_file, 'r').read()
+    namenode_status_active = open(namenode_status_active_file, 'r').read()
+    namenode_status_standby = open(namenode_status_standby_file, 'r').read()
 
     url_stream_mock = MagicMock()
-    url_stream_mock.read.side_effect = [namenode_jmx, journalnode_jmx,
-      journalnode_jmx, journalnode_jmx]
+    url_stream_mock.read.side_effect = [namenode_status_active, namenode_status_standby] + (num_journalnodes * [namenode_jmx, journalnode_jmx])
 
     urlopen_mock.return_value = url_stream_mock
 
@@ -277,11 +285,10 @@ class TestJournalnode(RMFTestCase):
 
     # ensure that the mock was called with the http-style version of the URL
     urlopen_mock.assert_called
-    urlopen_mock.assert_called_with( "http://c6403.ambari.apache.org:8480/jmx" )
+    urlopen_mock.assert_called_with("http://c6407.ambari.apache.org:8480/jmx")
 
     url_stream_mock.reset_mock()
-    url_stream_mock.read.side_effect = [namenode_jmx, journalnode_jmx,
-      journalnode_jmx, journalnode_jmx]
+    url_stream_mock.read.side_effect = [namenode_status_active, namenode_status_standby] + (num_journalnodes * [namenode_jmx, journalnode_jmx])
 
     urlopen_mock.return_value = url_stream_mock
 
@@ -294,7 +301,7 @@ class TestJournalnode(RMFTestCase):
 
     # ensure that the mock was called with the http-style version of the URL
     urlopen_mock.assert_called
-    urlopen_mock.assert_called_with( "https://c6403.ambari.apache.org:8481/jmx" )
+    urlopen_mock.assert_called_with("https://c6407.ambari.apache.org:8481/jmx")
 
 
 

文件差异内容过多而无法显示
+ 150 - 57
ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-hdfs-secure.json


+ 1 - 1
ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-jmx.json

@@ -74,7 +74,7 @@
     "TxnsWritten" : 592,
     "BytesWritten" : 60442,
     "BatchesWrittenWhileLagging" : 0,
-    "LastWrittenTxId" : 4499,
+    "LastWrittenTxId" : 1001,
     "LastPromisedEpoch" : 2,
     "LastWriterEpoch" : 2,
     "CurrentLagTxns" : 0

文件差异内容过多而无法显示
+ 117 - 136
ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-namenode-jmx.json


+ 10 - 0
ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-namenode-status-active.json

@@ -0,0 +1,10 @@
+{
+  "beans" : [ {
+    "name" : "Hadoop:service=NameNode,name=NameNodeStatus",
+    "modelerType" : "org.apache.hadoop.hdfs.server.namenode.NameNode",
+    "State" : "active",
+    "NNRole" : "NameNode",
+    "HostAndPort" : "c6406.ambari.apache.org:8020",
+    "SecurityEnabled" : true
+  } ]
+}

+ 10 - 0
ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade-namenode-status-standby.json

@@ -0,0 +1,10 @@
+{
+  "beans" : [ {
+    "name" : "Hadoop:service=NameNode,name=NameNodeStatus",
+    "modelerType" : "org.apache.hadoop.hdfs.server.namenode.NameNode",
+    "State" : "standby",
+    "NNRole" : "NameNode",
+    "HostAndPort" : "c6407.ambari.apache.org:8020",
+    "SecurityEnabled" : true
+  } ]
+}

文件差异内容过多而无法显示
+ 150 - 57
ambari-server/src/test/python/stacks/2.2/configs/journalnode-upgrade.json


部分文件因为文件数量过多而无法显示