FileCache.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import StringIO
  18. import logging
  19. import os
  20. import shutil
  21. import zipfile
  22. import urllib2
  23. import urllib
  24. from AmbariConfig import AmbariConfig
  25. logger = logging.getLogger()
  26. class CachingException(Exception):
  27. pass
  28. class FileCache():
  29. """
  30. Provides caching and lookup for service metadata files.
  31. If service metadata is not available at cache,
  32. downloads relevant files from the server.
  33. """
  34. CLUSTER_CONFIGURATION_CACHE_DIRECTORY="cluster_configuration"
  35. ALERTS_CACHE_DIRECTORY="alerts"
  36. RECOVERY_CACHE_DIRECTORY="recovery"
  37. STACKS_CACHE_DIRECTORY="stacks"
  38. COMMON_SERVICES_DIRECTORY="common-services"
  39. CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions"
  40. HOST_SCRIPTS_CACHE_DIRECTORY="host_scripts"
  41. HASH_SUM_FILE=".hash"
  42. ARCHIVE_NAME="archive.zip"
  43. ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY = "agent.auto.cache.update"
  44. BLOCK_SIZE=1024*16
  45. SOCKET_TIMEOUT=10
  46. def __init__(self, config):
  47. self.service_component_pool = {}
  48. self.config = config
  49. self.cache_dir = config.get('agent', 'cache_dir')
  50. # Defines whether command should fail when downloading scripts
  51. # from the server is not possible or agent should rollback to local copy
  52. self.tolerate_download_failures = \
  53. config.get('agent','tolerate_download_failures').lower() == 'true'
  54. self.reset()
  55. def reset(self):
  56. self.uptodate_paths = [] # Paths that already have been recently checked
  57. def get_service_base_dir(self, command, server_url_prefix):
  58. """
  59. Returns a base directory for service
  60. """
  61. service_subpath = command['commandParams']['service_package_folder']
  62. return self.provide_directory(self.cache_dir, service_subpath,
  63. server_url_prefix)
  64. def get_hook_base_dir(self, command, server_url_prefix):
  65. """
  66. Returns a base directory for hooks
  67. """
  68. try:
  69. hooks_subpath = command['commandParams']['hooks_folder']
  70. except KeyError:
  71. return None
  72. subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath)
  73. return self.provide_directory(self.cache_dir, subpath,
  74. server_url_prefix)
  75. def get_custom_actions_base_dir(self, server_url_prefix):
  76. """
  77. Returns a base directory for custom action scripts
  78. """
  79. return self.provide_directory(self.cache_dir,
  80. self.CUSTOM_ACTIONS_CACHE_DIRECTORY,
  81. server_url_prefix)
  82. def get_host_scripts_base_dir(self, server_url_prefix):
  83. """
  84. Returns a base directory for host scripts (host alerts, etc) which
  85. are scripts that are not part of the main agent code
  86. """
  87. return self.provide_directory(self.cache_dir,
  88. self.HOST_SCRIPTS_CACHE_DIRECTORY,
  89. server_url_prefix)
  90. def auto_cache_update_enabled(self):
  91. if self.config and \
  92. self.config.has_option(AmbariConfig.AMBARI_PROPERTIES_CATEGORY, FileCache.ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY) and \
  93. self.config.get(AmbariConfig.AMBARI_PROPERTIES_CATEGORY, FileCache.ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY).lower() == "false":
  94. return False
  95. return True
  96. def provide_directory(self, cache_path, subdirectory, server_url_prefix):
  97. """
  98. Ensures that directory at cache is up-to-date. Throws a CachingException
  99. if any problems occur
  100. Parameters;
  101. cache_path: full path to cache directory
  102. subdirectory: subpath inside cache
  103. server_url_prefix: url of "resources" folder at the server
  104. """
  105. full_path = os.path.join(cache_path, subdirectory)
  106. logger.debug("Trying to provide directory {0}".format(subdirectory))
  107. if not self.auto_cache_update_enabled():
  108. logger.debug("Auto cache update is disabled.")
  109. return full_path
  110. try:
  111. if full_path not in self.uptodate_paths:
  112. logger.debug("Checking if update is available for "
  113. "directory {0}".format(full_path))
  114. # Need to check for updates at server
  115. remote_url = self.build_download_url(server_url_prefix,
  116. subdirectory, self.HASH_SUM_FILE)
  117. memory_buffer = self.fetch_url(remote_url)
  118. remote_hash = memory_buffer.getvalue().strip()
  119. local_hash = self.read_hash_sum(full_path)
  120. if not local_hash or local_hash != remote_hash:
  121. logger.debug("Updating directory {0}".format(full_path))
  122. download_url = self.build_download_url(server_url_prefix,
  123. subdirectory, self.ARCHIVE_NAME)
  124. membuffer = self.fetch_url(download_url)
  125. # extract only when the archive is not zero sized
  126. if (membuffer.getvalue().strip()):
  127. self.invalidate_directory(full_path)
  128. self.unpack_archive(membuffer, full_path)
  129. self.write_hash_sum(full_path, remote_hash)
  130. else:
  131. logger.warn("Skipping empty archive: {0}. "
  132. "Expected archive was not found. Cached copy will be used.".format(download_url))
  133. pass
  134. # Finally consider cache directory up-to-date
  135. self.uptodate_paths.append(full_path)
  136. except CachingException, e:
  137. if self.tolerate_download_failures:
  138. # ignore
  139. logger.warn("Error occurred during cache update. "
  140. "Error tolerate setting is set to true, so"
  141. " ignoring this error and continuing with current cache. "
  142. "Error details: {0}".format(str(e)))
  143. else:
  144. raise # we are not tolerant to exceptions, command execution will fail
  145. return full_path
  146. def build_download_url(self, server_url_prefix,
  147. directory, filename):
  148. """
  149. Builds up a proper download url for file. Used for downloading files
  150. from the server.
  151. directory - relative path
  152. filename - file inside directory we are trying to fetch
  153. """
  154. return "{0}/{1}/{2}".format(server_url_prefix,
  155. urllib.pathname2url(directory), filename)
  156. def fetch_url(self, url):
  157. """
  158. Fetches content on url to in-memory buffer and returns the resulting buffer.
  159. May throw exceptions because of various reasons
  160. """
  161. logger.debug("Trying to download {0}".format(url))
  162. try:
  163. memory_buffer = StringIO.StringIO()
  164. proxy_handler = urllib2.ProxyHandler({})
  165. opener = urllib2.build_opener(proxy_handler)
  166. u = opener.open(url, timeout=self.SOCKET_TIMEOUT)
  167. logger.debug("Connected with {0} with code {1}".format(u.geturl(),
  168. u.getcode()))
  169. buff = u.read(self.BLOCK_SIZE)
  170. while buff:
  171. memory_buffer.write(buff)
  172. buff = u.read(self.BLOCK_SIZE)
  173. if not buff:
  174. break
  175. return memory_buffer
  176. except Exception, err:
  177. raise CachingException("Can not download file from"
  178. " url {0} : {1}".format(url, str(err)))
  179. def read_hash_sum(self, directory):
  180. """
  181. Tries to read a hash sum from previously generated file. Returns string
  182. containing hash or None
  183. """
  184. hash_file = os.path.join(directory, self.HASH_SUM_FILE)
  185. try:
  186. with open(hash_file) as fh:
  187. return fh.readline().strip()
  188. except:
  189. return None # We don't care
  190. def write_hash_sum(self, directory, new_hash):
  191. """
  192. Tries to read a hash sum from previously generated file. Returns string
  193. containing hash or None
  194. """
  195. hash_file = os.path.join(directory, self.HASH_SUM_FILE)
  196. try:
  197. with open(hash_file, "w") as fh:
  198. fh.write(new_hash)
  199. os.chmod(hash_file, 0o666)
  200. except Exception, err:
  201. raise CachingException("Can not write to file {0} : {1}".format(hash_file,
  202. str(err)))
  203. def invalidate_directory(self, directory):
  204. """
  205. Recursively removes directory content (if any). Also, creates
  206. directory and any parent directories if needed. May throw exceptions
  207. on permission problems
  208. """
  209. logger.debug("Invalidating directory {0}".format(directory))
  210. try:
  211. if os.path.exists(directory):
  212. if os.path.isfile(directory): # It would be a strange situation
  213. os.unlink(directory)
  214. elif os.path.isdir(directory):
  215. shutil.rmtree(directory)
  216. # create directory itself and any parent directories
  217. os.makedirs(directory)
  218. except Exception, err:
  219. raise CachingException("Can not invalidate cache directory {0}: {1}",
  220. directory, str(err))
  221. def unpack_archive(self, mem_buffer, target_directory):
  222. """
  223. Unpacks contents of in-memory buffer to file system.
  224. In-memory buffer is expected to contain a valid zip archive
  225. """
  226. try:
  227. zfile = zipfile.ZipFile(mem_buffer)
  228. for name in zfile.namelist():
  229. (dirname, filename) = os.path.split(name)
  230. concrete_dir=os.path.abspath(os.path.join(target_directory, dirname))
  231. if not os.path.isdir(concrete_dir):
  232. os.makedirs(concrete_dir)
  233. logger.debug("Unpacking file {0} to {1}".format(name, concrete_dir))
  234. if filename!='':
  235. zfile.extract(name, target_directory)
  236. except Exception, err:
  237. raise CachingException("Can not unpack zip file to "
  238. "directory {0} : {1}".format(
  239. target_directory, str(err)))