FileCache.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import StringIO
  18. import logging
  19. import os
  20. import shutil
  21. import zipfile
  22. import urllib2
  23. import urllib
  24. from AmbariConfig import AmbariConfig
  25. logger = logging.getLogger()
  26. class CachingException(Exception):
  27. pass
  28. class FileCache():
  29. """
  30. Provides caching and lookup for service metadata files.
  31. If service metadata is not available at cache,
  32. downloads relevant files from the server.
  33. """
  34. CLUSTER_CONFIGURATION_CACHE_DIRECTORY="cluster_configuration"
  35. ALERTS_CACHE_DIRECTORY="alerts"
  36. RECOVERY_CACHE_DIRECTORY="recovery"
  37. STACKS_CACHE_DIRECTORY="stacks"
  38. COMMON_SERVICES_DIRECTORY="common-services"
  39. CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions"
  40. DASHBOARD_DIRECTORY="dashboards"
  41. HOST_SCRIPTS_CACHE_DIRECTORY="host_scripts"
  42. HASH_SUM_FILE=".hash"
  43. ARCHIVE_NAME="archive.zip"
  44. ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY = "agent.auto.cache.update"
  45. BLOCK_SIZE=1024*16
  46. SOCKET_TIMEOUT=10
  47. def __init__(self, config):
  48. self.service_component_pool = {}
  49. self.config = config
  50. self.cache_dir = config.get('agent', 'cache_dir')
  51. # Defines whether command should fail when downloading scripts
  52. # from the server is not possible or agent should rollback to local copy
  53. self.tolerate_download_failures = \
  54. config.get('agent','tolerate_download_failures').lower() == 'true'
  55. self.reset()
  56. def reset(self):
  57. self.uptodate_paths = [] # Paths that already have been recently checked
  58. def get_service_base_dir(self, command, server_url_prefix):
  59. """
  60. Returns a base directory for service
  61. """
  62. service_subpath = command['commandParams']['service_package_folder']
  63. return self.provide_directory(self.cache_dir, service_subpath,
  64. server_url_prefix)
  65. def get_hook_base_dir(self, command, server_url_prefix):
  66. """
  67. Returns a base directory for hooks
  68. """
  69. try:
  70. hooks_subpath = command['commandParams']['hooks_folder']
  71. except KeyError:
  72. return None
  73. subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath)
  74. return self.provide_directory(self.cache_dir, subpath,
  75. server_url_prefix)
  76. def get_custom_actions_base_dir(self, server_url_prefix):
  77. """
  78. Returns a base directory for custom action scripts
  79. """
  80. return self.provide_directory(self.cache_dir,
  81. self.CUSTOM_ACTIONS_CACHE_DIRECTORY,
  82. server_url_prefix)
  83. def get_dashboard_base_dir(self, server_url_prefix):
  84. """
  85. Returns a base directory for dashboards
  86. """
  87. return self.provide_directory(self.cache_dir,
  88. self.DASHBOARD_DIRECTORY,
  89. server_url_prefix)
  90. def get_host_scripts_base_dir(self, server_url_prefix):
  91. """
  92. Returns a base directory for host scripts (host alerts, etc) which
  93. are scripts that are not part of the main agent code
  94. """
  95. return self.provide_directory(self.cache_dir,
  96. self.HOST_SCRIPTS_CACHE_DIRECTORY,
  97. server_url_prefix)
  98. def auto_cache_update_enabled(self):
  99. if self.config and \
  100. self.config.has_option(AmbariConfig.AMBARI_PROPERTIES_CATEGORY, FileCache.ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY) and \
  101. self.config.get(AmbariConfig.AMBARI_PROPERTIES_CATEGORY, FileCache.ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY).lower() == "false":
  102. return False
  103. return True
  104. def provide_directory(self, cache_path, subdirectory, server_url_prefix):
  105. """
  106. Ensures that directory at cache is up-to-date. Throws a CachingException
  107. if any problems occur
  108. Parameters;
  109. cache_path: full path to cache directory
  110. subdirectory: subpath inside cache
  111. server_url_prefix: url of "resources" folder at the server
  112. """
  113. full_path = os.path.join(cache_path, subdirectory)
  114. logger.debug("Trying to provide directory {0}".format(subdirectory))
  115. if not self.auto_cache_update_enabled():
  116. logger.debug("Auto cache update is disabled.")
  117. return full_path
  118. try:
  119. if full_path not in self.uptodate_paths:
  120. logger.debug("Checking if update is available for "
  121. "directory {0}".format(full_path))
  122. # Need to check for updates at server
  123. remote_url = self.build_download_url(server_url_prefix,
  124. subdirectory, self.HASH_SUM_FILE)
  125. memory_buffer = self.fetch_url(remote_url)
  126. remote_hash = memory_buffer.getvalue().strip()
  127. local_hash = self.read_hash_sum(full_path)
  128. if not local_hash or local_hash != remote_hash:
  129. logger.debug("Updating directory {0}".format(full_path))
  130. download_url = self.build_download_url(server_url_prefix,
  131. subdirectory, self.ARCHIVE_NAME)
  132. membuffer = self.fetch_url(download_url)
  133. # extract only when the archive is not zero sized
  134. if (membuffer.getvalue().strip()):
  135. self.invalidate_directory(full_path)
  136. self.unpack_archive(membuffer, full_path)
  137. self.write_hash_sum(full_path, remote_hash)
  138. else:
  139. logger.warn("Skipping empty archive: {0}. "
  140. "Expected archive was not found. Cached copy will be used.".format(download_url))
  141. pass
  142. # Finally consider cache directory up-to-date
  143. self.uptodate_paths.append(full_path)
  144. except CachingException, e:
  145. if self.tolerate_download_failures:
  146. # ignore
  147. logger.warn("Error occurred during cache update. "
  148. "Error tolerate setting is set to true, so"
  149. " ignoring this error and continuing with current cache. "
  150. "Error details: {0}".format(str(e)))
  151. else:
  152. raise # we are not tolerant to exceptions, command execution will fail
  153. return full_path
  154. def build_download_url(self, server_url_prefix,
  155. directory, filename):
  156. """
  157. Builds up a proper download url for file. Used for downloading files
  158. from the server.
  159. directory - relative path
  160. filename - file inside directory we are trying to fetch
  161. """
  162. return "{0}/{1}/{2}".format(server_url_prefix,
  163. urllib.pathname2url(directory), filename)
  164. def fetch_url(self, url):
  165. """
  166. Fetches content on url to in-memory buffer and returns the resulting buffer.
  167. May throw exceptions because of various reasons
  168. """
  169. logger.debug("Trying to download {0}".format(url))
  170. try:
  171. memory_buffer = StringIO.StringIO()
  172. proxy_handler = urllib2.ProxyHandler({})
  173. opener = urllib2.build_opener(proxy_handler)
  174. u = opener.open(url, timeout=self.SOCKET_TIMEOUT)
  175. logger.debug("Connected with {0} with code {1}".format(u.geturl(),
  176. u.getcode()))
  177. buff = u.read(self.BLOCK_SIZE)
  178. while buff:
  179. memory_buffer.write(buff)
  180. buff = u.read(self.BLOCK_SIZE)
  181. if not buff:
  182. break
  183. return memory_buffer
  184. except Exception, err:
  185. raise CachingException("Can not download file from"
  186. " url {0} : {1}".format(url, str(err)))
  187. def read_hash_sum(self, directory):
  188. """
  189. Tries to read a hash sum from previously generated file. Returns string
  190. containing hash or None
  191. """
  192. hash_file = os.path.join(directory, self.HASH_SUM_FILE)
  193. try:
  194. with open(hash_file) as fh:
  195. return fh.readline().strip()
  196. except:
  197. return None # We don't care
  198. def write_hash_sum(self, directory, new_hash):
  199. """
  200. Tries to read a hash sum from previously generated file. Returns string
  201. containing hash or None
  202. """
  203. hash_file = os.path.join(directory, self.HASH_SUM_FILE)
  204. try:
  205. with open(hash_file, "w") as fh:
  206. fh.write(new_hash)
  207. os.chmod(hash_file, 0o666)
  208. except Exception, err:
  209. raise CachingException("Can not write to file {0} : {1}".format(hash_file,
  210. str(err)))
  211. def invalidate_directory(self, directory):
  212. """
  213. Recursively removes directory content (if any). Also, creates
  214. directory and any parent directories if needed. May throw exceptions
  215. on permission problems
  216. """
  217. logger.debug("Invalidating directory {0}".format(directory))
  218. try:
  219. if os.path.exists(directory):
  220. if os.path.isfile(directory): # It would be a strange situation
  221. os.unlink(directory)
  222. elif os.path.isdir(directory):
  223. shutil.rmtree(directory)
  224. # create directory itself and any parent directories
  225. os.makedirs(directory)
  226. except Exception, err:
  227. raise CachingException("Can not invalidate cache directory {0}: {1}",
  228. directory, str(err))
  229. def unpack_archive(self, mem_buffer, target_directory):
  230. """
  231. Unpacks contents of in-memory buffer to file system.
  232. In-memory buffer is expected to contain a valid zip archive
  233. """
  234. try:
  235. zfile = zipfile.ZipFile(mem_buffer)
  236. for name in zfile.namelist():
  237. (dirname, filename) = os.path.split(name)
  238. concrete_dir=os.path.abspath(os.path.join(target_directory, dirname))
  239. if not os.path.isdir(concrete_dir):
  240. os.makedirs(concrete_dir)
  241. logger.debug("Unpacking file {0} to {1}".format(name, concrete_dir))
  242. if filename!='':
  243. zfile.extract(name, target_directory)
  244. except Exception, err:
  245. raise CachingException("Can not unpack zip file to "
  246. "directory {0} : {1}".format(
  247. target_directory, str(err)))