|
@@ -26,7 +26,8 @@ from resource_management.core import shell
|
|
|
from resource_management.core.logger import Logger
|
|
|
from resource_management.libraries.functions.decorator import retry
|
|
|
|
|
|
-__all__ = ["get_namenode_states", "get_active_namenode", "get_property_for_active_namenode"]
|
|
|
+__all__ = ["get_namenode_states", "get_active_namenode",
|
|
|
+ "get_property_for_active_namenode", "get_nameservice"]
|
|
|
|
|
|
HDFS_NN_STATE_ACTIVE = 'active'
|
|
|
HDFS_NN_STATE_STANDBY = 'standby'
|
|
@@ -67,7 +68,7 @@ def get_namenode_states_noretries(hdfs_site, security_enabled, run_user):
|
|
|
standby_namenodes = []
|
|
|
unknown_namenodes = []
|
|
|
|
|
|
- name_service = hdfs_site['dfs.nameservices']
|
|
|
+ name_service = get_nameservice(hdfs_site)
|
|
|
nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service
|
|
|
|
|
|
# now we have something like 'nn1,nn2,nn3,nn4'
|
|
@@ -117,7 +118,7 @@ def get_namenode_states_noretries(hdfs_site, security_enabled, run_user):
|
|
|
return active_namenodes, standby_namenodes, unknown_namenodes
|
|
|
|
|
|
def is_ha_enabled(hdfs_site):
|
|
|
- dfs_ha_nameservices = hdfs_site['dfs.nameservices']
|
|
|
+ dfs_ha_nameservices = get_nameservice(hdfs_site)
|
|
|
|
|
|
if is_empty(dfs_ha_nameservices):
|
|
|
return False
|
|
@@ -151,7 +152,7 @@ def get_property_for_active_namenode(hdfs_site, property_name, security_enabled,
|
|
|
value = None
|
|
|
rpc_key = None
|
|
|
if is_ha_enabled(hdfs_site):
|
|
|
- name_service = hdfs_site['dfs.nameservices']
|
|
|
+ name_service = get_nameservice(hdfs_site)
|
|
|
active_namenodes = get_namenode_states(hdfs_site, security_enabled, run_user)[0]
|
|
|
|
|
|
if not len(active_namenodes):
|
|
@@ -171,3 +172,31 @@ def get_property_for_active_namenode(hdfs_site, property_name, security_enabled,
|
|
|
value = value.replace(INADDR_ANY, rpc_host)
|
|
|
|
|
|
return value
|
|
|
+
|
|
|
+def get_nameservice(hdfs_site):
|
|
|
+ """
|
|
|
+ Multiple nameservices can be configured for example to support seamless distcp
|
|
|
+ between two HA clusters. The nameservices are defined as a comma separated
|
|
|
+ list in hdfs_site['dfs.nameservices']. The parameter
|
|
|
+ hdfs['dfs.internal.nameservices'] was introduced in Hadoop 2.6 to denote the
|
|
|
+ nameservice for the current cluster (HDFS-6376).
|
|
|
+
|
|
|
+ This method uses hdfs['dfs.internal.nameservices'] to get the current
|
|
|
+ nameservice, if that parameter is not available it tries to splits the value
|
|
|
+ in hdfs_site['dfs.nameservices'] returning the first string or what is
|
|
|
+ contained in hdfs_site['dfs.namenode.shared.edits.dir'].
|
|
|
+
|
|
|
+ By default hdfs_site['dfs.nameservices'] is returned.
|
|
|
+ :param hdfs_site:
|
|
|
+ :return: string or empty
|
|
|
+ """
|
|
|
+ name_service = hdfs_site.get('dfs.internal.nameservices', None)
|
|
|
+ if not name_service:
|
|
|
+ import re
|
|
|
+ name_service = hdfs_site.get('dfs.nameservices', None)
|
|
|
+ if name_service:
|
|
|
+ for ns in name_service.split(","):
|
|
|
+ if 'dfs.namenode.shared.edits.dir' in hdfs_site and re.match(r'.*%s$' % ns, hdfs_site['dfs.namenode.shared.edits.dir']): # better would be core_site['fs.defaultFS'] but it's not available
|
|
|
+ return ns
|
|
|
+ return name_service.split(",")[0] # default to return the first nameservice
|
|
|
+ return name_service
|