|
@@ -30,16 +30,16 @@ from resource_management.libraries.resources.execute_hadoop import ExecuteHadoop
|
|
from resource_management.core.resources.system import Execute
|
|
from resource_management.core.resources.system import Execute
|
|
from resource_management.core.exceptions import Fail
|
|
from resource_management.core.exceptions import Fail
|
|
from resource_management.core.logger import Logger
|
|
from resource_management.core.logger import Logger
|
|
|
|
+from resource_management.core import shell
|
|
|
|
|
|
"""
|
|
"""
|
|
This file provides helper methods needed for the versioning of RPMs. Specifically, it does dynamic variable
|
|
This file provides helper methods needed for the versioning of RPMs. Specifically, it does dynamic variable
|
|
-interpretation to replace strings like {{ hdp_stack_version }} and {{ component_version }} where the value of the
|
|
|
|
|
|
+interpretation to replace strings like {{ hdp_stack_version }} where the value of the
|
|
variables cannot be determined ahead of time, but rather, depends on what files are found.
|
|
variables cannot be determined ahead of time, but rather, depends on what files are found.
|
|
|
|
|
|
It assumes that {{ hdp_stack_version }} is constructed as ${major.minor.patch.rev}-${build_number}
|
|
It assumes that {{ hdp_stack_version }} is constructed as ${major.minor.patch.rev}-${build_number}
|
|
E.g., 998.2.2.1.0-998
|
|
E.g., 998.2.2.1.0-998
|
|
Please note that "-${build_number}" is optional.
|
|
Please note that "-${build_number}" is optional.
|
|
-Whereas {{ component_version }} is up to the Component to define, may be 3.0.1 or 301.
|
|
|
|
"""
|
|
"""
|
|
|
|
|
|
# These values must be the suffix of the properties in cluster-env.xml
|
|
# These values must be the suffix of the properties in cluster-env.xml
|
|
@@ -53,7 +53,7 @@ def _get_tar_source_and_dest_folder(tarball_prefix):
|
|
:return: Returns a tuple of (x, y) after verifying the properties
|
|
:return: Returns a tuple of (x, y) after verifying the properties
|
|
"""
|
|
"""
|
|
component_tar_source_file = default("/configurations/cluster-env/%s%s" % (tarball_prefix.lower(), TAR_SOURCE_SUFFIX), None)
|
|
component_tar_source_file = default("/configurations/cluster-env/%s%s" % (tarball_prefix.lower(), TAR_SOURCE_SUFFIX), None)
|
|
- # E.g., /usr/hdp/current/hadoop-client/tez-{{ component_version }}.{{ hdp_stack_version }}.tar.gz
|
|
|
|
|
|
+ # E.g., /usr/hdp/current/hadoop-client/tez-{{ hdp_stack_version }}.tar.gz
|
|
|
|
|
|
component_tar_destination_folder = default("/configurations/cluster-env/%s%s" % (tarball_prefix.lower(), TAR_DESTINATION_FOLDER_SUFFIX), None)
|
|
component_tar_destination_folder = default("/configurations/cluster-env/%s%s" % (tarball_prefix.lower(), TAR_DESTINATION_FOLDER_SUFFIX), None)
|
|
# E.g., hdfs:///hdp/apps/{{ hdp_stack_version }}/mapreduce/
|
|
# E.g., hdfs:///hdp/apps/{{ hdp_stack_version }}/mapreduce/
|
|
@@ -76,77 +76,12 @@ def _get_tar_source_and_dest_folder(tarball_prefix):
|
|
return component_tar_source_file, component_tar_destination_folder
|
|
return component_tar_source_file, component_tar_destination_folder
|
|
|
|
|
|
|
|
|
|
-def _create_regex_pattern(file_path, hdp_stack_version):
|
|
|
|
- """
|
|
|
|
- :param file_path: Input file path
|
|
|
|
- :param hdp_stack_version: Stack version, such as 2.2.0.0
|
|
|
|
- :return: Returns an expression that uses file system regex that can be used with ls and hadoop fs -ls
|
|
|
|
- """
|
|
|
|
- # Perform the variable interpretation
|
|
|
|
- file_path_pattern = file_path
|
|
|
|
- if "{{ component_version }}" in file_path_pattern:
|
|
|
|
- file_path_pattern = file_path_pattern.replace("{{ component_version }}", "*")
|
|
|
|
-
|
|
|
|
- # IMPORTANT, the build version was used in HDP 2.2, but may not be needed in future versions.
|
|
|
|
- if "{{ hdp_stack_version }}" in file_path_pattern:
|
|
|
|
- file_path_pattern = file_path_pattern.replace("{{ hdp_stack_version }}", hdp_stack_version + "*") # the trailing "*" is the optional build number
|
|
|
|
- return file_path_pattern
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def _populate_source_and_dests(tarball_prefix, source_file_pattern, component_tar_destination_folder, hdp_stack_version):
|
|
|
|
- """
|
|
|
|
- :param tarball_prefix: Prefix of the tarball must be one of tez, hive, mr, pig
|
|
|
|
- :param source_file_pattern: Regex pattern of the source file from the local file system
|
|
|
|
- :param component_tar_destination_folder: Destination folder to copy the file to in HDFS
|
|
|
|
- :param hdp_stack_version: Stack version number without the build version. E.g., 2.2.0.0
|
|
|
|
- :return: Returns a list of tuples (x, y), where x is the source file in the local file system,
|
|
|
|
- and y is the destination file path in HDFS
|
|
|
|
- """
|
|
|
|
- source_and_dest_pairs = []
|
|
|
|
-
|
|
|
|
- for file in glob.glob(source_file_pattern):
|
|
|
|
- file_base_name = os.path.basename(file)
|
|
|
|
- component_version = None
|
|
|
|
- hdp_build_version = None
|
|
|
|
-
|
|
|
|
- # Attempt to retrieve the hdp_build_version and component_version.
|
|
|
|
- # In case the build number (which is optional) has dots, attempt to match as many as possible.
|
|
|
|
- pattern = "%s-(.*)\\.%s-?([0-9\\.]*)\\..*" % (tarball_prefix, str(hdp_stack_version).replace(".", "\\."))
|
|
|
|
- m = re.search(pattern, file_base_name)
|
|
|
|
- if m and len(m.groups()) == 2:
|
|
|
|
- component_version = str(m.group(1))
|
|
|
|
- hdp_build_version = str(m.group(2)) # optional, so may be empty.
|
|
|
|
-
|
|
|
|
- missing_a_variable = False
|
|
|
|
- # The destination_file_path will be interpreted as well.
|
|
|
|
- destination_file_path = os.path.join(component_tar_destination_folder, file_base_name)
|
|
|
|
-
|
|
|
|
- if "{{ component_version }}" in destination_file_path:
|
|
|
|
- if component_version:
|
|
|
|
- destination_file_path = destination_file_path.replace("{{ component_version }}", component_version)
|
|
|
|
- else:
|
|
|
|
- missing_a_variable = True
|
|
|
|
-
|
|
|
|
- if "{{ hdp_stack_version }}" in destination_file_path:
|
|
|
|
- if hdp_build_version and hdp_build_version.strip() != "":
|
|
|
|
- destination_file_path = destination_file_path.replace("{{ hdp_stack_version }}", "%s-%s" %
|
|
|
|
- (hdp_stack_version, hdp_build_version))
|
|
|
|
- else:
|
|
|
|
- destination_file_path = destination_file_path.replace("{{ hdp_stack_version }}", "%s" % hdp_stack_version)
|
|
|
|
-
|
|
|
|
- if missing_a_variable:
|
|
|
|
- print("WARNING. Could not identify Component version in file %s , "
|
|
|
|
- "so will not copy to HDFS." % str(file))
|
|
|
|
- else:
|
|
|
|
- source_and_dest_pairs.append((file, destination_file_path))
|
|
|
|
- return source_and_dest_pairs
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def _copy_files(source_and_dest_pairs, file_owner, kinit_if_needed):
|
|
|
|
|
|
+def _copy_files(source_and_dest_pairs, file_owner, group_owner, kinit_if_needed):
|
|
"""
|
|
"""
|
|
:param source_and_dest_pairs: List of tuples (x, y), where x is the source file in the local file system,
|
|
:param source_and_dest_pairs: List of tuples (x, y), where x is the source file in the local file system,
|
|
and y is the destination file path in HDFS
|
|
and y is the destination file path in HDFS
|
|
- :param file_owner: Owner to set for the file copied to HDFS
|
|
|
|
|
|
+ :param file_owner: Owner to set for the file copied to HDFS (typically hdfs account)
|
|
|
|
+ :param group_owner: Owning group to set for the file copied to HDFS (typically hadoop group)
|
|
:param kinit_if_needed: kinit command if it is needed, otherwise an empty string
|
|
:param kinit_if_needed: kinit command if it is needed, otherwise an empty string
|
|
:return: Returns 0 if at least one file was copied and no exceptions occurred, and 1 otherwise.
|
|
:return: Returns 0 if at least one file was copied and no exceptions occurred, and 1 otherwise.
|
|
|
|
|
|
@@ -164,12 +99,13 @@ def _copy_files(source_and_dest_pairs, file_owner, kinit_if_needed):
|
|
params.HdfsDirectory(destination_dir,
|
|
params.HdfsDirectory(destination_dir,
|
|
action="create",
|
|
action="create",
|
|
owner=file_owner,
|
|
owner=file_owner,
|
|
- mode=0777
|
|
|
|
|
|
+ mode=0555
|
|
)
|
|
)
|
|
|
|
|
|
CopyFromLocal(source,
|
|
CopyFromLocal(source,
|
|
- mode=0755,
|
|
|
|
|
|
+ mode=0444,
|
|
owner=file_owner,
|
|
owner=file_owner,
|
|
|
|
+ group=group_owner,
|
|
dest_dir=destination_dir,
|
|
dest_dir=destination_dir,
|
|
kinnit_if_needed=kinit_if_needed,
|
|
kinnit_if_needed=kinit_if_needed,
|
|
hdfs_user=params.hdfs_user,
|
|
hdfs_user=params.hdfs_user,
|
|
@@ -181,11 +117,12 @@ def _copy_files(source_and_dest_pairs, file_owner, kinit_if_needed):
|
|
return return_value
|
|
return return_value
|
|
|
|
|
|
|
|
|
|
-def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner):
|
|
|
|
|
|
+def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner, group_owner):
|
|
"""
|
|
"""
|
|
:param tarball_prefix: Prefix of the tarball must be one of tez, hive, mr, pig
|
|
:param tarball_prefix: Prefix of the tarball must be one of tez, hive, mr, pig
|
|
:param component_user: User that will execute the Hadoop commands
|
|
:param component_user: User that will execute the Hadoop commands
|
|
- :param file_owner: Owner of the files copied to HDFS
|
|
|
|
|
|
+ :param file_owner: Owner of the files copied to HDFS (typically hdfs account)
|
|
|
|
+ :param group_owner: Group owner of the files copied to HDFS (typically hadoop group)
|
|
:return: Returns 0 on success, 1 if no files were copied, and in some cases may raise an exception.
|
|
:return: Returns 0 on success, 1 if no files were copied, and in some cases may raise an exception.
|
|
|
|
|
|
In order to call this function, params.py must have all of the following,
|
|
In order to call this function, params.py must have all of the following,
|
|
@@ -200,16 +137,27 @@ def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner):
|
|
|
|
|
|
component_tar_source_file, component_tar_destination_folder = _get_tar_source_and_dest_folder(tarball_prefix)
|
|
component_tar_source_file, component_tar_destination_folder = _get_tar_source_and_dest_folder(tarball_prefix)
|
|
if not component_tar_source_file or not component_tar_destination_folder:
|
|
if not component_tar_source_file or not component_tar_destination_folder:
|
|
|
|
+ Logger.warning("Could not retrieve properties for tarball with prefix: %s" % str(tarball_prefix))
|
|
|
|
+ return 1
|
|
|
|
+
|
|
|
|
+ if not os.path.exists(component_tar_source_file):
|
|
|
|
+ Logger.warning("Could not find file: %s" % str(component_tar_source_file))
|
|
return 1
|
|
return 1
|
|
|
|
|
|
- source_file_pattern = _create_regex_pattern(component_tar_source_file, params.hdp_stack_version)
|
|
|
|
- # This is just the last segment
|
|
|
|
- file_name_pattern = source_file_pattern.split('/')[-1:][0]
|
|
|
|
- tar_destination_folder_pattern = _create_regex_pattern(component_tar_destination_folder, params.hdp_stack_version)
|
|
|
|
|
|
+ get_hdp_version_cmd = "/usr/bin/hdp-select versions"
|
|
|
|
+ code, out = shell.call(get_hdp_version_cmd)
|
|
|
|
+ if code != 0 or not out.startswith(params.hdp_stack_version):
|
|
|
|
+ Logger.Warning("Could not verify HDP version by calling '%s'. Return Code: %s, Output: %s." %
|
|
|
|
+ (get_hdp_version_cmd, str(code), str(out)))
|
|
|
|
+ return 1
|
|
|
|
+
|
|
|
|
+ hdp_version = out.strip() # this should include the build number
|
|
|
|
+
|
|
|
|
+ file_name = os.path.basename(component_tar_source_file)
|
|
|
|
+ destination_file = os.path.join(component_tar_destination_folder, file_name)
|
|
|
|
+ destination_file = destination_file.replace("{{ hdp_stack_version }}", hdp_version)
|
|
|
|
|
|
- # Pattern for searching the file in HDFS. E.g. value, hdfs:///hdp/apps/2.2.0.0*/tez/tez-*.2.2.0.0*.tar.gz
|
|
|
|
- hdfs_file_pattern = os.path.join(tar_destination_folder_pattern, file_name_pattern)
|
|
|
|
- does_hdfs_file_exist_cmd = "fs -ls %s" % hdfs_file_pattern
|
|
|
|
|
|
+ does_hdfs_file_exist_cmd = "fs -ls %s" % destination_file
|
|
|
|
|
|
kinit_if_needed = ""
|
|
kinit_if_needed = ""
|
|
if params.security_enabled:
|
|
if params.security_enabled:
|
|
@@ -234,7 +182,6 @@ def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner):
|
|
pass
|
|
pass
|
|
|
|
|
|
if not does_hdfs_file_exist:
|
|
if not does_hdfs_file_exist:
|
|
- source_and_dest_pairs = _populate_source_and_dests(tarball_prefix, source_file_pattern,
|
|
|
|
- component_tar_destination_folder, params.hdp_stack_version)
|
|
|
|
- return _copy_files(source_and_dest_pairs, file_owner, kinit_if_needed)
|
|
|
|
|
|
+ source_and_dest_pairs = [(component_tar_source_file, destination_file), ]
|
|
|
|
+ return _copy_files(source_and_dest_pairs, file_owner, group_owner, kinit_if_needed)
|
|
return 1
|
|
return 1
|