123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import StringIO
- import logging
- import os
- import shutil
- import zipfile
- import urllib2
- import urllib
- from AmbariConfig import AmbariConfig
- logger = logging.getLogger()
- class CachingException(Exception):
- pass
- class FileCache():
- """
- Provides caching and lookup for service metadata files.
- If service metadata is not available at cache,
- downloads relevant files from the server.
- """
- CLUSTER_CONFIGURATION_CACHE_DIRECTORY="cluster_configuration"
- ALERTS_CACHE_DIRECTORY="alerts"
- RECOVERY_CACHE_DIRECTORY="recovery"
- STACKS_CACHE_DIRECTORY="stacks"
- COMMON_SERVICES_DIRECTORY="common-services"
- CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions"
- HOST_SCRIPTS_CACHE_DIRECTORY="host_scripts"
- HASH_SUM_FILE=".hash"
- ARCHIVE_NAME="archive.zip"
- ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY = "agent.auto.cache.update"
- BLOCK_SIZE=1024*16
- SOCKET_TIMEOUT=10
- def __init__(self, config):
- self.service_component_pool = {}
- self.config = config
- self.cache_dir = config.get('agent', 'cache_dir')
- # Defines whether command should fail when downloading scripts
- # from the server is not possible or agent should rollback to local copy
- self.tolerate_download_failures = \
- config.get('agent','tolerate_download_failures').lower() == 'true'
- self.reset()
- def reset(self):
- self.uptodate_paths = [] # Paths that already have been recently checked
- def get_service_base_dir(self, command, server_url_prefix):
- """
- Returns a base directory for service
- """
- service_subpath = command['commandParams']['service_package_folder']
- return self.provide_directory(self.cache_dir, service_subpath,
- server_url_prefix)
- def get_hook_base_dir(self, command, server_url_prefix):
- """
- Returns a base directory for hooks
- """
- try:
- hooks_subpath = command['commandParams']['hooks_folder']
- except KeyError:
- return None
- subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath)
- return self.provide_directory(self.cache_dir, subpath,
- server_url_prefix)
- def get_custom_actions_base_dir(self, server_url_prefix):
- """
- Returns a base directory for custom action scripts
- """
- return self.provide_directory(self.cache_dir,
- self.CUSTOM_ACTIONS_CACHE_DIRECTORY,
- server_url_prefix)
- def get_host_scripts_base_dir(self, server_url_prefix):
- """
- Returns a base directory for host scripts (host alerts, etc) which
- are scripts that are not part of the main agent code
- """
- return self.provide_directory(self.cache_dir,
- self.HOST_SCRIPTS_CACHE_DIRECTORY,
- server_url_prefix)
- def auto_cache_update_enabled(self):
- if self.config and \
- self.config.has_option(AmbariConfig.AMBARI_PROPERTIES_CATEGORY, FileCache.ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY) and \
- self.config.get(AmbariConfig.AMBARI_PROPERTIES_CATEGORY, FileCache.ENABLE_AUTO_AGENT_CACHE_UPDATE_KEY).lower() == "false":
- return False
- return True
- def provide_directory(self, cache_path, subdirectory, server_url_prefix):
- """
- Ensures that directory at cache is up-to-date. Throws a CachingException
- if any problems occur
- Parameters;
- cache_path: full path to cache directory
- subdirectory: subpath inside cache
- server_url_prefix: url of "resources" folder at the server
- """
- full_path = os.path.join(cache_path, subdirectory)
- logger.debug("Trying to provide directory {0}".format(subdirectory))
- if not self.auto_cache_update_enabled():
- logger.debug("Auto cache update is disabled.")
- return full_path
- try:
- if full_path not in self.uptodate_paths:
- logger.debug("Checking if update is available for "
- "directory {0}".format(full_path))
- # Need to check for updates at server
- remote_url = self.build_download_url(server_url_prefix,
- subdirectory, self.HASH_SUM_FILE)
- memory_buffer = self.fetch_url(remote_url)
- remote_hash = memory_buffer.getvalue().strip()
- local_hash = self.read_hash_sum(full_path)
- if not local_hash or local_hash != remote_hash:
- logger.debug("Updating directory {0}".format(full_path))
- download_url = self.build_download_url(server_url_prefix,
- subdirectory, self.ARCHIVE_NAME)
- membuffer = self.fetch_url(download_url)
- # extract only when the archive is not zero sized
- if (membuffer.getvalue().strip()):
- self.invalidate_directory(full_path)
- self.unpack_archive(membuffer, full_path)
- self.write_hash_sum(full_path, remote_hash)
- else:
- logger.warn("Skipping empty archive: {0}. "
- "Expected archive was not found. Cached copy will be used.".format(download_url))
- pass
- # Finally consider cache directory up-to-date
- self.uptodate_paths.append(full_path)
- except CachingException, e:
- if self.tolerate_download_failures:
- # ignore
- logger.warn("Error occurred during cache update. "
- "Error tolerate setting is set to true, so"
- " ignoring this error and continuing with current cache. "
- "Error details: {0}".format(str(e)))
- else:
- raise # we are not tolerant to exceptions, command execution will fail
- return full_path
- def build_download_url(self, server_url_prefix,
- directory, filename):
- """
- Builds up a proper download url for file. Used for downloading files
- from the server.
- directory - relative path
- filename - file inside directory we are trying to fetch
- """
- return "{0}/{1}/{2}".format(server_url_prefix,
- urllib.pathname2url(directory), filename)
- def fetch_url(self, url):
- """
- Fetches content on url to in-memory buffer and returns the resulting buffer.
- May throw exceptions because of various reasons
- """
- logger.debug("Trying to download {0}".format(url))
- try:
- memory_buffer = StringIO.StringIO()
- proxy_handler = urllib2.ProxyHandler({})
- opener = urllib2.build_opener(proxy_handler)
- u = opener.open(url, timeout=self.SOCKET_TIMEOUT)
- logger.debug("Connected with {0} with code {1}".format(u.geturl(),
- u.getcode()))
- buff = u.read(self.BLOCK_SIZE)
- while buff:
- memory_buffer.write(buff)
- buff = u.read(self.BLOCK_SIZE)
- if not buff:
- break
- return memory_buffer
- except Exception, err:
- raise CachingException("Can not download file from"
- " url {0} : {1}".format(url, str(err)))
- def read_hash_sum(self, directory):
- """
- Tries to read a hash sum from previously generated file. Returns string
- containing hash or None
- """
- hash_file = os.path.join(directory, self.HASH_SUM_FILE)
- try:
- with open(hash_file) as fh:
- return fh.readline().strip()
- except:
- return None # We don't care
- def write_hash_sum(self, directory, new_hash):
- """
- Tries to read a hash sum from previously generated file. Returns string
- containing hash or None
- """
- hash_file = os.path.join(directory, self.HASH_SUM_FILE)
- try:
- with open(hash_file, "w") as fh:
- fh.write(new_hash)
- os.chmod(hash_file, 0o666)
- except Exception, err:
- raise CachingException("Can not write to file {0} : {1}".format(hash_file,
- str(err)))
- def invalidate_directory(self, directory):
- """
- Recursively removes directory content (if any). Also, creates
- directory and any parent directories if needed. May throw exceptions
- on permission problems
- """
- logger.debug("Invalidating directory {0}".format(directory))
- try:
- if os.path.exists(directory):
- if os.path.isfile(directory): # It would be a strange situation
- os.unlink(directory)
- elif os.path.isdir(directory):
- shutil.rmtree(directory)
- # create directory itself and any parent directories
- os.makedirs(directory)
- except Exception, err:
- raise CachingException("Can not invalidate cache directory {0}: {1}",
- directory, str(err))
- def unpack_archive(self, mem_buffer, target_directory):
- """
- Unpacks contents of in-memory buffer to file system.
- In-memory buffer is expected to contain a valid zip archive
- """
- try:
- zfile = zipfile.ZipFile(mem_buffer)
- for name in zfile.namelist():
- (dirname, filename) = os.path.split(name)
- concrete_dir=os.path.abspath(os.path.join(target_directory, dirname))
- if not os.path.isdir(concrete_dir):
- os.makedirs(concrete_dir)
- logger.debug("Unpacking file {0} to {1}".format(name, concrete_dir))
- if filename!='':
- zfile.extract(name, target_directory)
- except Exception, err:
- raise CachingException("Can not unpack zip file to "
- "directory {0} : {1}".format(
- target_directory, str(err)))
|