Forráskód Böngészése

AMBARI-12166 Need feature in upgradeHelper.py to process configuration item only if service is exist in the cluster (dsen)

Dmytro Sen 10 éve
szülő
commit
4221fb9bc0

+ 80 - 2
ambari-server/src/main/python/upgradeHelper.py

@@ -34,6 +34,8 @@ Sub-section options:
   config-types - contains global per-config settings
     merged-copy - would merge latest server properties with properties defined in "properties" section,
                   without this option server properties would be rewritten by properties defined in "properties" section
+    required-services - properties from json catalog would be processed only if desired services are present on the cluster
+                        property level definition will always override catalog level definition.
 
 Sub-section properties - Contains property definition
 Sub-section property-mapping(optional) - contains mapping of property names in case, if some property changed their name in NEWVERSION
@@ -50,7 +52,8 @@ Example:
       "options": {
         "config-types": {
           "CONFIGTYPE1": {
-            "merged-copy": "yes"
+            "merged-copy": "yes",
+            "required-services": ["HDFS"]
           }
         }
       },
@@ -62,7 +65,8 @@ Example:
           },
           "template_property": {
            "value": "{TEMPLATE_TAG}",
-           "template": "yes"
+           "template": "yes",
+           "required-services": ["HDFS", "YARN"]
           }
         }
       },
@@ -145,6 +149,7 @@ class CatConst(Const):
   PROPERTY_VALUE_TAG = "value"
   PROPERTY_REMOVE_TAG = "remove"
   MERGED_COPY_TAG = "merged-copy"
+  REQUIRED_SERVICES = "required-services"
   ITEMS_TAG = "items"
   TYPE_TAG = "type"
   TRUE_TAG = "yes"
@@ -196,6 +201,8 @@ class Options(Const):
   # for verify action
   REPORT_FILE = None
 
+  SERVICES = []
+
   API_TOKENS = {
     "user": None,
     "pass": None
@@ -210,6 +217,8 @@ class Options(Const):
     cls.ROOT_URL = '%s://%s:%s/api/v1' % (cls.API_PROTOCOL, cls.HOST, cls.API_PORT)
     cls.CLUSTER_URL = cls.ROOT_URL + "/clusters/%s" % cls.CLUSTER_NAME
     cls.COMPONENTS_FORMAT = cls.CLUSTER_URL + "/components/{0}"
+    if cls.CLUSTER_NAME is not None and cls.HOST is not None:
+      cls.SERVICES = set(map(lambda x: x.upper(), get_cluster_services()))
 
   @classmethod
   def initialize_logger(cls, filename=None):
@@ -431,6 +440,11 @@ class ConfigConst(object):
     return self._config_types_value_definition.keys()
 
   def get(self, name):
+    """
+    Return desired property catalog
+    :param name: str
+    :return: dict
+    """
     if name in self._config_types_value_definition:
       return self._config_types_value_definition[name]
     raise Exception("No config group with name %s found" % name)
@@ -712,9 +726,70 @@ def get_config_resp_all():
 
   return desired_configs
 
+def is_services_exists(required_services):
+  """
+  return true, if required_services is a part of Options.SERVICES
+  :param required_services: list
+  :return: bool
+  """
+  # sets are equal
+  if Options.SERVICES == set(required_services):
+    return True
+
+  return set(map(lambda x: x.upper(), required_services)) < Options.SERVICES
+
+def get_cluster_services():
+  services_url = Options.CLUSTER_URL + '/services'
+  raw_services = curl(services_url, parse=True, simulate=False)
+
+  # expected structure:
+  # items: [ {"href":"...", "ServiceInfo":{"cluster_name":"..", "service_name":".."}}, ..., ... ]
+  if raw_services is not None and "items" in raw_services and isinstance(raw_services["items"], list):
+    return list(map(lambda item: item["ServiceInfo"]["service_name"], raw_services["items"]))
+
+  Options.logger.warning("Failed to load services list, functionality that depends on them couldn't work")
+  return []
+
+def filter_properties_by_service_presence(config_type, catalog, catalog_properties):
+  """
+  Filter properties by required-services tag.
+  required-services tag could be catalog to per-property defined. per-property definition
+  will always override per-catalog definition.
+  :param config_type: str
+  :param catalog: UpgradeCatalog
+  :param catalog_properties: dict
+  :return: dict
+  """
+  cproperties = dict(catalog_properties)
+  catalog_required_services = []
+  del_props = []
+
+  #  do nothing
+  if CatConst.REQUIRED_SERVICES in catalog.config_groups.get(config_type) and \
+    isinstance(catalog.config_groups.get(config_type)[CatConst.REQUIRED_SERVICES], list):
+    catalog_required_services = catalog.config_groups.get(config_type)[CatConst.REQUIRED_SERVICES]
+
+  for prop_name in cproperties:
+    # set per catalog limitation
+    required_services = catalog_required_services
+    if CatConst.REQUIRED_SERVICES in cproperties[prop_name] and\
+      isinstance(cproperties[prop_name][CatConst.REQUIRED_SERVICES], list):
+      # set per property limitation
+      required_services = catalog_properties[prop_name][CatConst.REQUIRED_SERVICES]
+
+    # add property to list for remove
+    if not is_services_exists(required_services):
+      del_props.append(prop_name)
+
+  # remove properties
+  for prop in del_props:
+    del cproperties[prop]
+
+  return cproperties
 
 def modify_config_item(config_type, catalog):
   #  here should be declared tokens for pattern replace
+
   if catalog.get_parsed_version()["from"] == 13:  # ToDo: introduce class for pre-defined tokens
     hostmapping = read_mapping()
     jt_host = hostmapping["JOBTRACKER"][0]
@@ -746,6 +821,9 @@ def modify_config_item(config_type, catalog):
   is_merged_copy = CatConst.MERGED_COPY_TAG in catalog.config_groups.get(config_type) \
    and catalog.config_groups.get(config_type)[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG
 
+  # filter properties by service-required tag
+  properties_copy = filter_properties_by_service_presence(config_type, catalog, properties_copy)
+
   # ToDo: implement property transfer from one catalog to other
   #   properties_to_move = [
   #     "dfs.namenode.checkpoint.edits.dir",

+ 45 - 4
ambari-server/src/test/python/TestUpgradeHelper.py

@@ -47,6 +47,7 @@ class TestUpgradeHelper(TestCase):
   catalog_from = "1.3"
   catalog_to = "2.2"
   catalog_cfg_type = "my type"
+  required_service = "TEST"
   test_catalog = """{
    "version": "1.0",
    "stacks": [
@@ -63,7 +64,10 @@ class TestUpgradeHelper(TestCase):
        },
        "properties": {
          "%s": {
-           "my property": "my value"
+           "my property": {
+             "value": "my value",
+             "required-services": [\"%s\"]
+           }
          }
        },
        "property-mapping": {
@@ -76,7 +80,9 @@ class TestUpgradeHelper(TestCase):
 
   def setUp(self):
     # replace original curl call to mock
-    self.test_catalog = self.test_catalog % (self.catalog_from, self.catalog_to, self.catalog_cfg_type, self.catalog_cfg_type)
+    self.test_catalog = self.test_catalog % (self.catalog_from, self.catalog_to,
+                                             self.catalog_cfg_type, self.catalog_cfg_type,
+                                             self.required_service)
 
     self.original_curl = upgradeHelper.curl
     upgradeHelper.curl = self.magic_curl
@@ -134,6 +140,35 @@ class TestUpgradeHelper(TestCase):
     self.assertEqual({"user": options.user, "pass": options.password}, upgradeHelper.Options.API_TOKENS)
     self.assertEqual(options.clustername, upgradeHelper.Options.CLUSTER_NAME)
 
+  def test_is_services_exists(self):
+    old_services = upgradeHelper.Options.SERVICES
+
+    upgradeHelper.Options.SERVICES = set(['TEST1', 'TEST2'])
+    actual_result = upgradeHelper.is_services_exists(['TEST1'])
+
+    # check for situation with two empty sets
+    upgradeHelper.Options.SERVICES = set()
+    actual_result_1 = upgradeHelper.is_services_exists([])
+
+    upgradeHelper.Options.SERVICES = old_services
+
+    self.assertEqual(True, actual_result)
+    self.assertEqual(True, actual_result_1)
+
+  @patch.object(upgradeHelper, "is_services_exists")
+  def test_filter_properties_by_service_presence(self, is_service_exists_mock):
+    catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
+    catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
+    cfg_type = self.catalog_cfg_type
+    is_service_exists_mock.return_value = True
+
+    old_services = upgradeHelper.Options.SERVICES
+    upgradeHelper.Options.SERVICES = set([self.required_service])
+    actual_result = upgradeHelper.filter_properties_by_service_presence(cfg_type, catalog, catalog.get_properties(self.catalog_cfg_type))
+
+    upgradeHelper.Options.SERVICES = old_services
+    self.assertEqual(catalog.get_properties(self.catalog_cfg_type), actual_result)
+
   @patch("__builtin__.open")
   @patch.object(os.path, "isfile")
   @patch("os.remove")
@@ -640,6 +675,8 @@ class TestUpgradeHelper(TestCase):
                               get_config_mock, read_mapping_mock):
     catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
     get_config_resp_mock.return_value = "", {}
+    old_services = upgradeHelper.Options.SERVICES
+    upgradeHelper.Options.SERVICES = set([self.required_service])
     catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
     cfg_type = self.catalog_cfg_type
     read_mapping_mock.return_value = {
@@ -653,7 +690,8 @@ class TestUpgradeHelper(TestCase):
       cfg_type,
       {
         "my property": {
-          "value": "my value"
+          "value": "my value",
+          "required-services": ["TEST"]
         }
       },
       {
@@ -664,6 +702,8 @@ class TestUpgradeHelper(TestCase):
     # execute testing function
     upgradeHelper.modify_config_item(cfg_type, catalog)
 
+    upgradeHelper.Options.SERVICES = old_services
+
     actual_params = [
       update_config_using_existing_properties_mock.call_args[0][0],
       update_config_using_existing_properties_mock.call_args[0][1],
@@ -860,7 +900,8 @@ class TestUpgradeHelper(TestCase):
     expected_result = [
       {
         'catalog_item': {
-          'value': u'my value'
+          'value': u'my value',
+          'required-services': [u'TEST']
         },
         'property': 'my property',
         'actual_value': {