|
@@ -21,6 +21,8 @@ Ambari Agent
|
|
|
"""
|
|
|
import re
|
|
|
import os
|
|
|
+import grp
|
|
|
+import pwd
|
|
|
import time
|
|
|
from resource_management.core.environment import Environment
|
|
|
from resource_management.core.base import Fail
|
|
@@ -63,10 +65,10 @@ class HdfsResourceJar:
|
|
|
"""
|
|
|
This is slower than HdfsResourceWebHDFS implementation of HdfsResouce, but it works in any cases on any DFS types.
|
|
|
|
|
|
- The idea is to put all the files/directories/copyFromLocals we have to create/delete into a json file.
|
|
|
- And then create in it with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json.
|
|
|
+ The idea is to put all the files/directories/copyFromLocals/copyToLocals we have to create/delete into a json file.
|
|
|
+ And then perform them with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json.
|
|
|
|
|
|
- 'create_and_execute' and 'delete_on_execute' does nothing but add files/directories to this json,
|
|
|
+ 'create_and_execute', 'delete_on_execute' and "download_on_execute do nothing except add actions to this json,
|
|
|
while execute does all the expensive creating/deleting work executing the jar with the json as parameter.
|
|
|
"""
|
|
|
def action_delayed(self, action_name, main_resource):
|
|
@@ -96,7 +98,7 @@ class HdfsResourceJar:
|
|
|
main_resource.assert_parameter_is_set('user')
|
|
|
|
|
|
if not 'hdfs_files' in env.config or not env.config['hdfs_files']:
|
|
|
- Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' wasn't triggered before this 'execute' action.")
|
|
|
+ Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' or 'download_on_execute' wasn't triggered before this 'execute' action.")
|
|
|
return
|
|
|
|
|
|
hadoop_bin_dir = main_resource.resource.hadoop_bin_dir
|
|
@@ -163,13 +165,18 @@ class WebHDFSUtil:
|
|
|
for k,v in kwargs.iteritems():
|
|
|
url = format("{url}&{k}={v}")
|
|
|
|
|
|
- if file_to_put and not os.path.exists(file_to_put):
|
|
|
- raise Fail(format("File {file_to_put} is not found."))
|
|
|
-
|
|
|
cmd = ["curl", "-sS","-L", "-w", "%{http_code}", "-X", method]
|
|
|
-
|
|
|
- if file_to_put:
|
|
|
- cmd += ["--data-binary", "@"+file_to_put, "-H", "Content-Type: application/octet-stream"]
|
|
|
+
|
|
|
+ # When operation is "OPEN" the target is actually the DFS file to download and the file_to_put is actually the target see _download_file
|
|
|
+ if operation == "OPEN":
|
|
|
+ cmd += ["-o", file_to_put]
|
|
|
+ else:
|
|
|
+ if file_to_put and not os.path.exists(file_to_put):
|
|
|
+ raise Fail(format("File {file_to_put} is not found."))
|
|
|
+
|
|
|
+ if file_to_put:
|
|
|
+ cmd += ["--data-binary", "@"+file_to_put, "-H", "Content-Type: application/octet-stream"]
|
|
|
+
|
|
|
if self.security_enabled:
|
|
|
cmd += ["--negotiate", "-u", ":"]
|
|
|
if self.is_https_enabled:
|
|
@@ -232,6 +239,28 @@ class HdfsResourceWebHDFS:
|
|
|
|
|
|
if self.target_status and self.target_status['type'].lower() != type:
|
|
|
raise Fail(format("Trying to create file/directory but directory/file exists in the DFS on {target}"))
|
|
|
+
|
|
|
+ def _assert_download_valid(self):
|
|
|
+ source = self.main_resource.resource.source
|
|
|
+ type = self.main_resource.resource.type
|
|
|
+ target = self.main_resource.resource.target
|
|
|
+
|
|
|
+ if source:
|
|
|
+ self.source_status = self._get_file_status(source)
|
|
|
+ if self.source_status == None:
|
|
|
+ raise Fail(format("Source {source} doesn't exist"))
|
|
|
+ if type == "directory" and self.source_status['type'] == "FILE":
|
|
|
+ raise Fail(format("Source {source} is file but type is {type}"))
|
|
|
+ elif type == "file" and self.source_status['type'] == "DIRECTORY":
|
|
|
+ raise Fail(format("Source {source} is directory but type is {type}"))
|
|
|
+ else:
|
|
|
+ raise Fail(format("No source provided"))
|
|
|
+
|
|
|
+ if os.path.exists(target):
|
|
|
+ if type == "directory" and os.path.isfile(target):
|
|
|
+ raise Fail(format("Trying to download directory but file exists locally {target}"))
|
|
|
+ elif type == "file" and os.path.isdir(target):
|
|
|
+ raise Fail(format("Trying to download file but directory exists locally {target}"))
|
|
|
|
|
|
def action_delayed(self, action_name, main_resource):
|
|
|
main_resource.assert_parameter_is_set('user')
|
|
@@ -244,7 +273,10 @@ class HdfsResourceWebHDFS:
|
|
|
self.mode = oct(main_resource.resource.mode)[1:] if main_resource.resource.mode else main_resource.resource.mode
|
|
|
self.mode_set = False
|
|
|
self.main_resource = main_resource
|
|
|
- self._assert_valid()
|
|
|
+ if action_name == "download":
|
|
|
+ self._assert_download_valid()
|
|
|
+ else:
|
|
|
+ self._assert_valid()
|
|
|
|
|
|
if self.main_resource.manage_if_exists == False and self.target_status:
|
|
|
Logger.info("Skipping the operation for not managed DFS directory " + str(self.main_resource.resource.target) +
|
|
@@ -255,6 +287,8 @@ class HdfsResourceWebHDFS:
|
|
|
self._create_resource()
|
|
|
self._set_mode(self.target_status)
|
|
|
self._set_owner(self.target_status)
|
|
|
+ elif action_name == "download":
|
|
|
+ self._download_resource()
|
|
|
else:
|
|
|
self._delete_resource()
|
|
|
|
|
@@ -282,16 +316,78 @@ class HdfsResourceWebHDFS:
|
|
|
else:
|
|
|
self._create_file(new_target, new_source)
|
|
|
|
|
|
+ def _download_resource(self):
|
|
|
+ if self.main_resource.resource.source == None:
|
|
|
+ return
|
|
|
+
|
|
|
+ if self.main_resource.resource.type == "file":
|
|
|
+ self._download_file(self.main_resource.resource.target, self.main_resource.resource.source, self.source_status)
|
|
|
+ elif self.main_resource.resource.type == "directory":
|
|
|
+ self._download_directory(self.main_resource.resource.target, self.main_resource.resource.source)
|
|
|
+
|
|
|
+ def _download_directory(self, target, source):
|
|
|
+ self._create_local_directory(target)
|
|
|
+
|
|
|
+ for file_status in self._list_directory(source):
|
|
|
+ if not file_status == None:
|
|
|
+ next_path_part = file_status['pathSuffix']
|
|
|
+ new_source = format("{source}/{next_path_part}")
|
|
|
+ new_target = os.path.join(target, next_path_part)
|
|
|
+ if file_status['type'] == "DIRECTORY":
|
|
|
+ self._download_directory(new_target, new_source)
|
|
|
+ else:
|
|
|
+ self._download_file(new_target, new_source, file_status)
|
|
|
+
|
|
|
+ def _create_local_directory(self, target):
|
|
|
+ if not os.path.exists(target):
|
|
|
+ Logger.info(format("Creating local directory {target}"))
|
|
|
+ sudo.makedir(target, "")
|
|
|
+
|
|
|
+ owner_name = "" if not self.main_resource.resource.owner else self.main_resource.resource.owner
|
|
|
+ group_name = "" if not self.main_resource.resource.group else self.main_resource.resource.group
|
|
|
+ owner = pwd.getpwnam(owner_name)
|
|
|
+ group = grp.getgrnam(group_name)
|
|
|
+ sudo.chown(target, owner, group)
|
|
|
+
|
|
|
+ def _download_file(self, target, source, file_status):
|
|
|
+ """
|
|
|
+ PUT file command is slow, however _get_file_status is pretty fast,
|
|
|
+ so we should check if the file really should be put before doing it.
|
|
|
+ """
|
|
|
+
|
|
|
+ if file_status and os.path.exists(target):
|
|
|
+ length = file_status['length']
|
|
|
+ local_file_size = os.stat(target).st_size # TODO: os -> sudo
|
|
|
+
|
|
|
+ # TODO: re-implement this using checksums
|
|
|
+ if local_file_size == length:
|
|
|
+ Logger.info(format("DFS file {source} is identical to {target}, skipping the download"))
|
|
|
+ return
|
|
|
+ elif not self.main_resource.resource.replace_existing_files:
|
|
|
+ Logger.info(format("Not replacing existing local file {target} which is different from DFS file {source}, due to replace_existing_files=False"))
|
|
|
+ return
|
|
|
+
|
|
|
+ kwargs = {}
|
|
|
+ self.util.run_command(source, 'OPEN', method='GET', overwrite=True, assertable_result=False, file_to_put=target, **kwargs)
|
|
|
+
|
|
|
+
|
|
|
def _create_directory(self, target):
|
|
|
if target == self.main_resource.resource.target and self.target_status:
|
|
|
return
|
|
|
-
|
|
|
+
|
|
|
self.util.run_command(target, 'MKDIRS', method='PUT')
|
|
|
-
|
|
|
+
|
|
|
def _get_file_status(self, target):
|
|
|
list_status = self.util.run_command(target, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
|
|
|
return list_status['FileStatus'] if 'FileStatus' in list_status else None
|
|
|
-
|
|
|
+
|
|
|
+ def _list_directory(self, target):
|
|
|
+ results = self.util.run_command(target, 'LISTSTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
|
|
|
+ entry = results['FileStatuses'] if 'FileStatuses' in results else None
|
|
|
+ if entry == None:
|
|
|
+ return []
|
|
|
+ return entry['FileStatus'] if 'FileStatus' in entry else []
|
|
|
+
|
|
|
def _create_file(self, target, source=None, mode=""):
|
|
|
"""
|
|
|
PUT file command in slow, however _get_file_status is pretty fast,
|
|
@@ -299,12 +395,12 @@ class HdfsResourceWebHDFS:
|
|
|
"""
|
|
|
file_status = self._get_file_status(target) if target!=self.main_resource.resource.target else self.target_status
|
|
|
mode = "" if not mode else mode
|
|
|
-
|
|
|
+
|
|
|
if file_status:
|
|
|
if source:
|
|
|
length = file_status['length']
|
|
|
local_file_size = os.stat(source).st_size # TODO: os -> sudo
|
|
|
-
|
|
|
+
|
|
|
# TODO: re-implement this using checksums
|
|
|
if local_file_size == length:
|
|
|
Logger.info(format("DFS file {target} is identical to {source}, skipping the copying"))
|
|
@@ -315,16 +411,16 @@ class HdfsResourceWebHDFS:
|
|
|
else:
|
|
|
Logger.info(format("File {target} already exists in DFS, skipping the creation"))
|
|
|
return
|
|
|
-
|
|
|
+
|
|
|
Logger.info(format("Creating new file {target} in DFS"))
|
|
|
kwargs = {'permission': mode} if mode else {}
|
|
|
-
|
|
|
+
|
|
|
self.util.run_command(target, 'CREATE', method='PUT', overwrite=True, assertable_result=False, file_to_put=source, **kwargs)
|
|
|
-
|
|
|
+
|
|
|
if mode and file_status:
|
|
|
file_status['permission'] = mode
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
def _delete_resource(self):
|
|
|
if not self.target_status:
|
|
|
return
|
|
@@ -336,14 +432,14 @@ class HdfsResourceWebHDFS:
|
|
|
|
|
|
if not self.main_resource.resource.recursive_chown and (not owner or file_status and file_status['owner'] == owner) and (not group or file_status and file_status['group'] == group):
|
|
|
return
|
|
|
-
|
|
|
+
|
|
|
self.util.run_command(self.main_resource.resource.target, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
|
|
|
-
|
|
|
+
|
|
|
results = []
|
|
|
-
|
|
|
+
|
|
|
if self.main_resource.resource.recursive_chown:
|
|
|
content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
|
|
|
-
|
|
|
+
|
|
|
if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
|
|
|
self._fill_directories_list(self.main_resource.resource.target, results)
|
|
|
else: # avoid chmowning a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
|
|
@@ -351,34 +447,33 @@ class HdfsResourceWebHDFS:
|
|
|
|
|
|
if self.main_resource.resource.change_permissions_for_parents:
|
|
|
self._fill_in_parent_directories(self.main_resource.resource.target, results)
|
|
|
-
|
|
|
+
|
|
|
for path in results:
|
|
|
self.util.run_command(path, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
|
|
|
-
|
|
|
+
|
|
|
def _set_mode(self, file_status=None):
|
|
|
if not self.mode or file_status and file_status['permission'] == self.mode:
|
|
|
return
|
|
|
-
|
|
|
+
|
|
|
if not self.mode_set:
|
|
|
self.util.run_command(self.main_resource.resource.target, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
|
|
|
-
|
|
|
+
|
|
|
results = []
|
|
|
-
|
|
|
+
|
|
|
if self.main_resource.resource.recursive_chmod:
|
|
|
content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
|
|
|
-
|
|
|
+
|
|
|
if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
|
|
|
self._fill_directories_list(self.main_resource.resource.target, results)
|
|
|
else: # avoid chmoding a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
|
|
|
shell.checked_call(["hadoop", "fs", "-chmod", "-R", self.mode, self.main_resource.resource.target], user=self.main_resource.resource.user)
|
|
|
-
|
|
|
+
|
|
|
if self.main_resource.resource.change_permissions_for_parents:
|
|
|
self._fill_in_parent_directories(self.main_resource.resource.target, results)
|
|
|
-
|
|
|
+
|
|
|
for path in results:
|
|
|
self.util.run_command(path, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
def _fill_in_parent_directories(self, target, results):
|
|
|
path_parts = HdfsResourceProvider.parse_path(target).split("/")[1:]# [1:] remove '' from parts
|
|
|
path = "/"
|
|
@@ -386,18 +481,19 @@ class HdfsResourceWebHDFS:
|
|
|
for path_part in path_parts:
|
|
|
path += path_part + "/"
|
|
|
results.append(path)
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
def _fill_directories_list(self, target, results):
|
|
|
list_status = self.util.run_command(target, 'LISTSTATUS', method='GET', assertable_result=False)['FileStatuses']['FileStatus']
|
|
|
-
|
|
|
+
|
|
|
for file in list_status:
|
|
|
if file['pathSuffix']:
|
|
|
new_path = target + "/" + file['pathSuffix']
|
|
|
results.append(new_path)
|
|
|
-
|
|
|
+
|
|
|
if file['type'] == 'DIRECTORY':
|
|
|
self._fill_directories_list(new_path, results)
|
|
|
-
|
|
|
+
|
|
|
class HdfsResourceProvider(Provider):
|
|
|
def __init__(self, resource):
|
|
|
super(HdfsResourceProvider,self).__init__(resource)
|
|
@@ -461,6 +557,9 @@ class HdfsResourceProvider(Provider):
|
|
|
def action_delete_on_execute(self):
|
|
|
self.action_delayed("delete")
|
|
|
|
|
|
+ def action_download_on_execute(self):
|
|
|
+ self.action_delayed("download")
|
|
|
+
|
|
|
def action_execute(self):
|
|
|
self.get_hdfs_resource_executor().action_execute(self)
|
|
|
|