|
@@ -69,11 +69,12 @@ Example:
|
|
|
},
|
|
|
"test_property": {
|
|
|
"value": "new value",
|
|
|
- "override: "no", (optional, override already existed property yes/no)
|
|
|
+ "override: "no", (optional, override already existed property yes/no. Default: yes)
|
|
|
"value-required": "old value", (optional, property would be set if the required value is present)
|
|
|
"can-create": "no", (optional, process property only if that property present on the server.
|
|
|
- i.e. ability to create new property, default yes)
|
|
|
- "required-services": ["HDFS", "YARN"] (optional, process property only if selected services existed)
|
|
|
+ i.e. ability to create new property. Default: yes)
|
|
|
+ "required-services": ["HDFS", "YARN"], (optional, process property only if selected services existed)
|
|
|
+ "resolve-dependency": "no" (optional, use Stack Advisor to get depended properties changes. Default: no)
|
|
|
}
|
|
|
}
|
|
|
},
|
|
@@ -92,7 +93,7 @@ Example:
|
|
|
)
|
|
|
"replace-from": "something", (optional, should be present both from and to. Replace 'from' value to 'to')
|
|
|
"replace-to": "something,
|
|
|
- "required-services": ["YARN"] (optional, process entry if services in the list existed on the cluster
|
|
|
+ "required-services": ["YARN"], (optional, process entry if services in the list existed on the cluster
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -172,6 +173,10 @@ class PropertyNotFoundException(Exception):
|
|
|
pass
|
|
|
|
|
|
|
|
|
+class StackNotFoundException(Exception):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
class MalformedPropertyDefinitionException(Exception):
|
|
|
pass
|
|
|
|
|
@@ -218,6 +223,8 @@ class Options(Const):
|
|
|
logger = None
|
|
|
server_config_factory = None
|
|
|
""":type : ServerConfigFactory"""
|
|
|
+ stack_advisor = None
|
|
|
+ """:type : StackAdvisor"""
|
|
|
|
|
|
# Api constants
|
|
|
ROOT_URL = None
|
|
@@ -253,9 +260,16 @@ class Options(Const):
|
|
|
cls.CLUSTER_URL = cls.ROOT_URL + "/clusters/%s" % cls.CLUSTER_NAME
|
|
|
cls.COMPONENTS_FORMAT = cls.CLUSTER_URL + "/components/{0}"
|
|
|
cls.TEZ_VIEW_URL = cls.ROOT_URL + "/views/TEZ"
|
|
|
+ cls.STACKS_URL = cls.ROOT_URL + "/stacks"
|
|
|
+ cls.STACKS_VERSIONS_URL = cls.STACKS_URL + "/{0}/versions"
|
|
|
+ cls.STACK_ADVISOR_URL = cls.STACKS_VERSIONS_URL + "/{1}/recommendations"
|
|
|
+ cls.AMBARI_SERVER_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_SERVER"
|
|
|
+ cls.AMBARI_AGENTS_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_AGENT"
|
|
|
if cls.CLUSTER_NAME is not None and cls.HOST is not None:
|
|
|
cls.SERVICES = set(map(lambda x: x.upper(), get_cluster_services()))
|
|
|
|
|
|
+ cls.ambari_server = AmbariServer()
|
|
|
+
|
|
|
@classmethod
|
|
|
def initialize_logger(cls, filename=None):
|
|
|
cls.logger = logging.getLogger('UpgradeHelper')
|
|
@@ -284,6 +298,7 @@ class CatConst(Const):
|
|
|
STACK_PROPERTIES = "properties"
|
|
|
STACK_PROPERTIES_ATTRIBUTES = "properties_attributes"
|
|
|
PROPERTY_VALUE_TAG = "value"
|
|
|
+ VERSIONS_TAG = "versions"
|
|
|
PROPERTY_REMOVE_TAG = "remove"
|
|
|
PROPERTY_MAP_TO = "map-to"
|
|
|
PROPERTY_MAP_FROM = "map-from"
|
|
@@ -293,6 +308,7 @@ class CatConst(Const):
|
|
|
MERGED_COPY_TAG = "merged-copy"
|
|
|
REQUIRED_SERVICES = "required-services"
|
|
|
COERCE_TO_PROPERTY_TAG = "coerce-to"
|
|
|
+ RESOLVE_DEPENDENCY_TAG = "resolve-dependency"
|
|
|
COERCE_YAML_OPTION_TAG = "yaml-array"
|
|
|
REPLACE_FROM_TAG = "replace-from"
|
|
|
REPLACE_TO_TAG = "replace-to"
|
|
@@ -314,6 +330,179 @@ class CatConst(Const):
|
|
|
# ==============================
|
|
|
# Catalog classes definition
|
|
|
# ==============================
|
|
|
+
|
|
|
+class AmbariServer(object):
|
|
|
+ def __init__(self):
|
|
|
+ Options.logger.info("Resolving Ambari server configuration ...")
|
|
|
+ self._get_server_info()
|
|
|
+ self._get_agents_info()
|
|
|
+
|
|
|
+ def _get_server_info(self):
|
|
|
+ info = curl(Options.AMBARI_SERVER_URL, parse=True)
|
|
|
+ self._server_version = [0, 0, 0]
|
|
|
+
|
|
|
+ if "RootServiceComponents" in info:
|
|
|
+ server_props = info["RootServiceComponents"]
|
|
|
+ ver = server_props["component_version"] if "component_version" in server_props else None
|
|
|
+ try:
|
|
|
+ self._server_version = list(map(lambda x: int(x), ver.split(".")))
|
|
|
+ except ValueError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def _get_agents_info(self):
|
|
|
+ info = curl(Options.AMBARI_AGENTS_URL, parse=True)
|
|
|
+ self._agents = []
|
|
|
+ if "hostComponents" in info:
|
|
|
+ agent_props = info["hostComponents"]
|
|
|
+ self._agents = list(map(lambda x: x["RootServiceHostComponents"]["host_name"], agent_props))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def server_version(self):
|
|
|
+ return self._server_version
|
|
|
+
|
|
|
+ @property
|
|
|
+ def agent_hosts(self):
|
|
|
+ return self._agents
|
|
|
+
|
|
|
+class StackAdvisorFactory(object):
|
|
|
+ def __init__(self):
|
|
|
+ self._stack_info = self._load_stack_info()
|
|
|
+
|
|
|
+ def _load_stack_versions(self, stack):
|
|
|
+ versions = curl(Options.STACKS_VERSIONS_URL.format(stack), parse=True)
|
|
|
+ if CatConst.ITEMS_TAG in versions:
|
|
|
+ versions = list(map(lambda x: x["Versions"]["stack_version"], versions[CatConst.ITEMS_TAG]))
|
|
|
+
|
|
|
+ return versions
|
|
|
+
|
|
|
+ def _load_stack_info(self):
|
|
|
+ stacks = curl(Options.STACKS_URL, parse=True)
|
|
|
+ if CatConst.ITEMS_TAG in stacks:
|
|
|
+ stacks = list(map(lambda x: x["Stacks"]["stack_name"], stacks["items"]))
|
|
|
+ else:
|
|
|
+ stacks = {}
|
|
|
+
|
|
|
+ stacks_dict = {}
|
|
|
+
|
|
|
+ for stack in stacks:
|
|
|
+ stacks_dict[stack] = self._load_stack_versions(stack)
|
|
|
+
|
|
|
+ return stacks_dict
|
|
|
+
|
|
|
+ def get_instance(self, stack, version):
|
|
|
+ sversion = Options.ambari_server.server_version
|
|
|
+ if sversion[0] * 10 + sversion[1] < 21:
|
|
|
+ Options.logger.warning("Ambari server version \"%s.%s.%s\" doesn't support property dependencies suggestion" %
|
|
|
+ (sversion[0], sversion[1], sversion[2]))
|
|
|
+ return BaseStackAdvisor(stack, version)
|
|
|
+
|
|
|
+ if stack in self._stack_info and version in self._stack_info[stack]:
|
|
|
+ return StackAdvisor(stack, version)
|
|
|
+ else:
|
|
|
+ raise StackNotFoundException("Stack %s-%s not exist on the server" % (stack, version))
|
|
|
+
|
|
|
+class StackAdvisorRequestProperty(object):
|
|
|
+ def __init__(self, catalog, property_name):
|
|
|
+ self._catalog = catalog
|
|
|
+ self._property_name = property_name
|
|
|
+
|
|
|
+ @property
|
|
|
+ def catalog(self):
|
|
|
+ return self._catalog
|
|
|
+
|
|
|
+ @property
|
|
|
+ def name(self):
|
|
|
+ return self._property_name
|
|
|
+
|
|
|
+ def get_json(self):
|
|
|
+ return {
|
|
|
+ "type": self.catalog,
|
|
|
+ "name": self.name
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+class BaseStackAdvisor(object):
|
|
|
+ def __init__(self, stack, version):
|
|
|
+ self._req_url = Options.STACK_ADVISOR_URL.format(stack, version)
|
|
|
+
|
|
|
+ def get_suggestion(self, cfg_factory, changed_properties):
|
|
|
+ return {}
|
|
|
+
|
|
|
+
|
|
|
+class StackAdvisor(BaseStackAdvisor):
|
|
|
+ def __init__(self, stack, version):
|
|
|
+ super(StackAdvisor, self).__init__(stack, version)
|
|
|
+
|
|
|
+ def _transform_properties(self, cfg_factory):
|
|
|
+ """
|
|
|
+ Transform properties list to blueprint output format
|
|
|
+ :type cfg_factory: ServerConfigFactory
|
|
|
+ :rtype dict
|
|
|
+ """
|
|
|
+ props = cfg_factory.get_json()
|
|
|
+ for cfg in props:
|
|
|
+ props[cfg] = {
|
|
|
+ "properties": props[cfg]
|
|
|
+ }
|
|
|
+
|
|
|
+ return props
|
|
|
+
|
|
|
+ def _from_blueprint_properties_transform(self, props):
|
|
|
+ """
|
|
|
+ Transform SA response to dict
|
|
|
+ """
|
|
|
+ for p in props:
|
|
|
+ rprop = {}
|
|
|
+ if "properties" in props[p] and props[p]["properties"] is not None:
|
|
|
+ rprop = props[p]["properties"]
|
|
|
+ if "property_attributes" in props[p]:
|
|
|
+ for property_attribute in props[p]["property_attributes"]:
|
|
|
+ if "delete" in props[p]["property_attributes"][property_attribute] and \
|
|
|
+ props[p]["property_attributes"][property_attribute]["delete"] == "true":
|
|
|
+ rprop[property_attribute] = None
|
|
|
+
|
|
|
+ props[p] = rprop
|
|
|
+
|
|
|
+ return props
|
|
|
+
|
|
|
+ def _generate_req_properties(self, properties):
|
|
|
+ rlist = []
|
|
|
+ for item in properties:
|
|
|
+ if isinstance(item, StackAdvisorRequestProperty):
|
|
|
+ rlist.append(item.get_json())
|
|
|
+ return rlist
|
|
|
+
|
|
|
+ def get_suggestion(self, cfg_factory, changed_properties):
|
|
|
+ """
|
|
|
+ :type cfg_factory: ServerConfigFactory
|
|
|
+ :type catalog_name str
|
|
|
+ :type changed_properties: list
|
|
|
+ :rtype dict
|
|
|
+ """
|
|
|
+ request = {
|
|
|
+ "recommend": "configuration-dependencies",
|
|
|
+ "hosts": Options.ambari_server.agent_hosts,
|
|
|
+ "services": list(Options.SERVICES),
|
|
|
+ "changed_configurations": self._generate_req_properties(changed_properties),
|
|
|
+ "recommendations": {
|
|
|
+ "blueprint": {
|
|
|
+ "host_groups": [],
|
|
|
+ "configurations": self._transform_properties(cfg_factory),
|
|
|
+ "blueprint_cluster_binding": {}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ response = curl(self._req_url, request_type="POST", data=request, parse=True)
|
|
|
+ if "resources" in response and isinstance(response["resources"], list) and len(response["resources"]) > 0:
|
|
|
+ response = response["resources"][0]
|
|
|
+ if "recommendations" in response and "blueprint" in response["recommendations"] and \
|
|
|
+ "configurations" in response["recommendations"]["blueprint"]:
|
|
|
+ return self._from_blueprint_properties_transform(response["recommendations"]["blueprint"]["configurations"])
|
|
|
+
|
|
|
+ return {}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
class UpgradeCatalogFactory(object):
|
|
|
# versions of catalog which is currently supported
|
|
|
_supported_catalog_versions = ["1.0"]
|
|
@@ -433,6 +622,14 @@ class UpgradeCatalog(object):
|
|
|
def version(self):
|
|
|
return "%s-%s" % (self._version[CatConst.STACK_VERSION_OLD], self._version[CatConst.STACK_VERSION_TARGET])
|
|
|
|
|
|
+ @property
|
|
|
+ def target_version(self):
|
|
|
+ return self._version[CatConst.STACK_VERSION_TARGET]
|
|
|
+
|
|
|
+ @property
|
|
|
+ def source_version(self):
|
|
|
+ return self._version[CatConst.STACK_VERSION_OLD]
|
|
|
+
|
|
|
def get_parsed_version(self):
|
|
|
"""
|
|
|
Get numeric representation of the version for comparation purposes
|
|
@@ -486,8 +683,9 @@ class UpgradeCatalog(object):
|
|
|
def tag_search_pattern(self):
|
|
|
return self._search_pattern
|
|
|
|
|
|
- def __handle_remove_tag(self, catalog_item_name, catalog_property_item, properties):
|
|
|
+ def __handle_remove_tag(self, name, catalog_item_name, catalog_property_item, properties):
|
|
|
"""
|
|
|
+ :type name str
|
|
|
:type catalog_item_name str
|
|
|
:type catalog_property_item dict
|
|
|
:type properties dict
|
|
@@ -515,8 +713,9 @@ class UpgradeCatalog(object):
|
|
|
except TemplateProcessingException:
|
|
|
pass
|
|
|
|
|
|
- def __handle_add_new(self, catalog_item_name, catalog_property_item, properties):
|
|
|
+ def __handle_add_new(self, name, catalog_item_name, catalog_property_item, properties):
|
|
|
"""
|
|
|
+ :type name str
|
|
|
:type catalog_item_name str
|
|
|
:type catalog_property_item dict
|
|
|
:type properties dict
|
|
@@ -528,8 +727,9 @@ class UpgradeCatalog(object):
|
|
|
self.__handle_template_tag_sub(catalog_item_name, catalog_property_item)
|
|
|
properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
|
|
|
|
|
|
- def __handle_change_existing(self, catalog_item_name, catalog_property_item, properties):
|
|
|
+ def __handle_change_existing(self, name, catalog_item_name, catalog_property_item, properties):
|
|
|
"""
|
|
|
+ :type name str
|
|
|
:type catalog_item_name str
|
|
|
:type catalog_property_item dict
|
|
|
:type properties dict
|
|
@@ -545,6 +745,31 @@ class UpgradeCatalog(object):
|
|
|
properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
|
|
|
return properties
|
|
|
|
|
|
+ def __handle_dependency_tag(self, name, catalog_item_name, catalog_property_item, properties):
|
|
|
+ """
|
|
|
+ :type name str
|
|
|
+ :type catalog_item_name str
|
|
|
+ :type catalog_property_item dict
|
|
|
+ :type properties dict
|
|
|
+ """
|
|
|
+ if CatConst.RESOLVE_DEPENDENCY_TAG in catalog_property_item and \
|
|
|
+ catalog_property_item[CatConst.RESOLVE_DEPENDENCY_TAG] == CatConst.TRUE_TAG:
|
|
|
+ sa_suggestions = Options.stack_advisor.get_suggestion(Options.server_config_factory,
|
|
|
+ [StackAdvisorRequestProperty(name, catalog_item_name)])
|
|
|
+ for sa_catalog in sa_suggestions:
|
|
|
+ # create new config group if not existed
|
|
|
+ if sa_catalog not in Options.server_config_factory.items():
|
|
|
+ Options.server_config_factory.create_config(sa_catalog)
|
|
|
+
|
|
|
+ catalog_properties = Options.server_config_factory.get_config(sa_catalog).properties
|
|
|
+ for sa_property in sa_suggestions[sa_catalog]:
|
|
|
+ if sa_suggestions[sa_catalog][sa_property] is None and sa_property in catalog_properties:
|
|
|
+ print "rem %s:%s" % (sa_catalog, sa_property)
|
|
|
+ del catalog_properties[sa_property]
|
|
|
+ elif sa_suggestions[sa_catalog][sa_property] is not None:
|
|
|
+ catalog_properties[sa_property] = sa_suggestions[sa_catalog][sa_property]
|
|
|
+
|
|
|
+
|
|
|
def __can_handler_execute(self, catalog_options, catalog_property_item, property_item, properties):
|
|
|
"""
|
|
|
:type catalog_options dict
|
|
@@ -582,6 +807,7 @@ class UpgradeCatalog(object):
|
|
|
tag_handlers = [
|
|
|
self.__handle_add_new,
|
|
|
self.__handle_change_existing,
|
|
|
+ self.__handle_dependency_tag,
|
|
|
self.__handle_remove_tag
|
|
|
]
|
|
|
# catalog has no update entries for this config group
|
|
@@ -593,7 +819,7 @@ class UpgradeCatalog(object):
|
|
|
catalog_options = self.options[name] if name in self.options else {}
|
|
|
if self.__can_handler_execute(catalog_options, catalog_property_item, catalog_item[catalog_property_item], properties):
|
|
|
for handler in tag_handlers:
|
|
|
- handler(catalog_property_item, catalog_item[catalog_property_item], properties)
|
|
|
+ handler(name, catalog_property_item, catalog_item[catalog_property_item], properties)
|
|
|
|
|
|
|
|
|
class PropertyMapping(object):
|
|
@@ -661,6 +887,19 @@ class ServerConfigFactory(object):
|
|
|
if config_item is not None and name == _name and name in self._server_catalogs:
|
|
|
config_item.notify(action, arg)
|
|
|
|
|
|
+ def __str__(self):
|
|
|
+ catalogs = {}
|
|
|
+ for cfg in self._server_catalogs:
|
|
|
+ catalogs[cfg] = str(self._server_catalogs[cfg])
|
|
|
+
|
|
|
+ return json.dumps(catalogs)
|
|
|
+
|
|
|
+ def get_json(self):
|
|
|
+ catalogs = {}
|
|
|
+ for cfg in self._server_catalogs:
|
|
|
+ catalogs[cfg] = self._server_catalogs[cfg].properties
|
|
|
+
|
|
|
+ return catalogs
|
|
|
def get_config(self, name):
|
|
|
"""
|
|
|
Get configuration item object
|
|
@@ -831,6 +1070,9 @@ class ServerConfig(object):
|
|
|
def is_attributes_exists(self):
|
|
|
return CatConst.STACK_PROPERTIES_ATTRIBUTES in self._configs
|
|
|
|
|
|
+ def __str__(self):
|
|
|
+ return json.dumps(self.properties)
|
|
|
+
|
|
|
@property
|
|
|
def properties(self):
|
|
|
return self._configs[CatConst.STACK_PROPERTIES]
|
|
@@ -1497,6 +1739,7 @@ def modify_configs():
|
|
|
catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog
|
|
|
catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack,
|
|
|
Options.OPTIONS.to_stack) # get desired version of catalog
|
|
|
+ Options.stack_advisor = StackAdvisorFactory().get_instance(catalog.name, catalog.target_version)
|
|
|
|
|
|
# load all desired configs from the server
|
|
|
# ToDo: implement singleton for that class
|
|
@@ -1850,7 +2093,7 @@ def main():
|
|
|
Options.INSTALL_YARN_MR2_ACTION: install_services,
|
|
|
Options.BACKUP_CONFIG_ACTION: backup_configs,
|
|
|
Options.VERIFY_ACTION: verify_configuration
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
parser = optparse.OptionParser(usage="usage: %prog [options] action\n Valid actions: "
|
|
|
+ ", ".join(action_list.keys())
|