Browse Source

AMBARI-17751. Fix the logic of size calculation while dealing with multi-level YARN queue for LLAP config calculations. Also, added validation check for 'hive.server2.tez.sessions.per.default.queue'.

Swapan Shridhar 9 years ago
parent
commit
4f7f32a09f

+ 89 - 36
ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py

@@ -167,7 +167,8 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
          and Hive2 app.
       2. Queue selected in 'hive.llap.daemon.queue.name' config state should not be 'STOPPED'.
       3. 'hive.server2.enable.doAs' config should be set to 'false' for Hive2.
-      4. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB.
+      4. 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) should not consume more that 50% of selected queue for LLAP.
+      5. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB.
   """
   def validateHiveInteractiveSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
     validationItems = []
@@ -175,6 +176,14 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
     curr_selected_queue_for_llap = None
     curr_selected_queue_for_llap_cap_perc =  None
     MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS = 512
+    current_selected_queue_for_llap_cap =  None
+
+    # Get total cluster capacity
+    node_manager_host_list = self.get_node_manager_hosts(services, hosts)
+    node_manager_cnt = len(node_manager_host_list)
+    yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
+    total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
+
     if len(hsi_hosts) > 0:
       capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
       if capacity_scheduler_properties:
@@ -182,9 +191,10 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
             'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
           curr_selected_queue_for_llap = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
           if curr_selected_queue_for_llap:
-            curr_selected_queue_for_llap_cap_perc = self.__getQueueCapFromCapacityScheduler(capacity_scheduler_properties, curr_selected_queue_for_llap)
-            if curr_selected_queue_for_llap_cap_perc:
-              curr_selected_queue_for_llap_cap_perc = int(float(curr_selected_queue_for_llap_cap_perc))
+            current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties,
+                                                                                  curr_selected_queue_for_llap, total_cluster_capacity)
+            if current_selected_queue_for_llap_cap:
+              curr_selected_queue_for_llap_cap_perc = int(current_selected_queue_for_llap_cap * 100 / total_cluster_capacity)
               min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations)
 
               # Validate that the selected queue in 'hive.llap.daemon.queue.name' should be sized >= to minimum required
@@ -218,29 +228,39 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
         Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing validation checks for Hive Server Interactive.")
         pass
 
-      # Validate that 'hive.server2.enable.doAs' config is not set to 'true' for Hive2.
       if self.HIVE_INTERACTIVE_SITE in services['configurations']:
+        # Validate that 'hive.server2.enable.doAs' config is not set to 'true' for Hive2.
         if 'hive.server2.enable.doAs' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
           hive2_enable_do_as = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.server2.enable.doAs']
           if hive2_enable_do_as == 'true':
             validationItems.append({"config-name": "hive.server2.enable.doAs","item": self.getErrorItem("Value should be set to 'false' for Hive2.")})
 
+        # Validate that 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) is not consuming more that
+        # 50% of selected queue for LLAP.
+        if current_selected_queue_for_llap_cap and 'hive.server2.tez.sessions.per.default.queue' in \
+          services['configurations']['hive-interactive-site']['properties']:
+          num_tez_sessions = services['configurations']['hive-interactive-site']['properties']['hive.server2.tez.sessions.per.default.queue']
+          if num_tez_sessions:
+            num_tez_sessions = long(num_tez_sessions)
+            yarn_min_container_size = self.get_yarn_min_container_size(services, configurations)
+            tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity))
+            normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
+            llap_selected_queue_cap_remaining = current_selected_queue_for_llap_cap - (normalized_tez_am_container_size * num_tez_sessions)
+            if llap_selected_queue_cap_remaining <= current_selected_queue_for_llap_cap/2:
+              errMsg3 = " Reducing the 'Maximum Total Concurrent Queries' (value: {0}) is advisable as it is consuming more than 50% of " \
+                        "'{1}' queue for LLAP.".format(num_tez_sessions, curr_selected_queue_for_llap)
+              validationItems.append({"config-name": "hive.server2.tez.sessions.per.default.queue","item": self.getWarnItem(errMsg3)})
+
       # Validate that 'remaining available capacity' in cluster is atleast 512 MB, after 'llap' queue is selected,
       # in order to run Service Checks.
       if curr_selected_queue_for_llap and curr_selected_queue_for_llap_cap_perc and \
           curr_selected_queue_for_llap == self.AMBARI_MANAGED_LLAP_QUEUE_NAME:
-        # Get total cluster capacity
-        node_manager_host_list = self.get_node_manager_hosts(services, hosts)
-        node_manager_cnt = len(node_manager_host_list)
-        yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
-        total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
         curr_selected_queue_for_llap_cap = float(curr_selected_queue_for_llap_cap_perc) / 100 * total_cluster_capacity
         available_cap_in_cluster = total_cluster_capacity - curr_selected_queue_for_llap_cap
         if available_cap_in_cluster < MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS:
-          errMsg3 =  "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \
+          errMsg4 =  "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \
                      "({2}) in cluster is less than 512 MB.".format(self.AMBARI_MANAGED_LLAP_QUEUE_NAME, curr_selected_queue_for_llap_cap, available_cap_in_cluster)
-          validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg3)})
-
+          validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg4)})
 
     validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-site")
     return validationProblems
@@ -626,30 +646,32 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
 
       node_manager_host_list = self.get_node_manager_hosts(services, hosts)
       node_manager_cnt = len(node_manager_host_list)
-      # Check which queue is selected in 'hive.llap.daemon.queue.name', to determine current queue capacity
-      current_selected_queue_for_llap_cap = None
-      if llap_queue_selected_in_current_call == llap_queue_name or llap_daemon_selected_queue_name == llap_queue_name:
-        current_selected_queue_for_llap_cap = self.get_llap_cap_percent_slider(services, configurations)
-      else:  # any queue other than 'llap'
-        current_selected_queue_for_llap_cap = self.__getQueueCapFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
-
-      assert (current_selected_queue_for_llap_cap >= 1), "Current selected queue '{0}' capacity value : {1}. Expected value : >= 1" \
-        .format(llap_daemon_selected_queue_name, current_selected_queue_for_llap_cap)
-
       yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
       total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
       Logger.info("\n\nCalculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, "
                   "yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb))
 
+      # Check which queue is selected in 'hive.llap.daemon.queue.name', to determine current queue capacity
+      current_selected_queue_for_llap_cap = None
+      yarn_root_queues = capacity_scheduler_properties.get("yarn.scheduler.capacity.root.queues")
+      if llap_queue_selected_in_current_call == llap_queue_name \
+        or llap_daemon_selected_queue_name == llap_queue_name \
+        and (llap_queue_name in yarn_root_queues and len(leafQueueNames) == 2):
+        current_selected_queue_for_llap_cap_perc = self.get_llap_cap_percent_slider(services, configurations)
+        current_selected_queue_for_llap_cap = current_selected_queue_for_llap_cap_perc / 100 * total_cluster_capacity
+      else:  # any queue other than 'llap'
+        current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties,
+                                                                              llap_daemon_selected_queue_name, total_cluster_capacity)
+      assert (current_selected_queue_for_llap_cap >= 1), "Current selected queue '{0}' capacity value : {1}. Expected value : >= 1" \
+        .format(llap_daemon_selected_queue_name, current_selected_queue_for_llap_cap)
       yarn_min_container_size = self.get_yarn_min_container_size(services, configurations)
       tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity))
       normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
       Logger.info("Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, "
                   "total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size,
                                                         total_cluster_capacity))
-      total_llap_queue_size = long(
-        self._normalizeDown((float(current_selected_queue_for_llap_cap) / 100 * total_cluster_capacity),
-                            yarn_min_container_size))
+      normalized_selected_queue_for_llap_cap = long(self._normalizeDown(current_selected_queue_for_llap_cap, yarn_min_container_size))
+
       # Get calculated value for Slider AM container Size
       slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size),
                                                    yarn_min_container_size)
@@ -657,10 +679,10 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
       # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it.
       if not llap_concurrency_in_changed_configs:
         # Calculate llap concurrency (i.e. Number of Tez AM's)
-        llap_concurrency = float(total_llap_queue_size * 0.25 / normalized_tez_am_container_size)
+        llap_concurrency = float(normalized_selected_queue_for_llap_cap * 0.25 / normalized_tez_am_container_size)
         llap_concurrency = max(long(llap_concurrency), 1)
-        Logger.info("Calculated llap_concurrency : {0}, using following : total_llap_queue_size : {1}, "
-                    "normalized_tez_am_container_size : {2}".format(llap_concurrency, total_llap_queue_size,
+        Logger.info("Calculated llap_concurrency : {0}, using following : normalized_selected_queue_for_llap_cap : {1}, "
+                    "normalized_tez_am_container_size : {2}".format(llap_concurrency, normalized_selected_queue_for_llap_cap,
                                                                     normalized_tez_am_container_size))
         # Limit 'llap_concurrency' to reach a max. of 32.
         if llap_concurrency > LLAP_MAX_CONCURRENCY:
@@ -681,14 +703,14 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
 
       # Calculate 'total memory available for llap daemons' across cluster
       total_am_capacity_required = normalized_tez_am_container_size * llap_concurrency + slider_am_container_size
-      cap_available_for_daemons = total_llap_queue_size - total_am_capacity_required
+      cap_available_for_daemons = normalized_selected_queue_for_llap_cap - total_am_capacity_required
       Logger.info(
         "Calculated cap_available_for_daemons : {0}, using following : current_selected_queue_for_llap_cap : {1}, "
-        "yarn_nm_mem_in_mb : {2}, total_cluster_capacity : {3}, total_llap_queue_size : {4}, normalized_tez_am_container_size"
+        "yarn_nm_mem_in_mb : {2}, total_cluster_capacity : {3}, normalized_selected_queue_for_llap_cap : {4}, normalized_tez_am_container_size"
         " : {5}, yarn_min_container_size : {6}, llap_concurrency : {7}, total_am_capacity_required : {8}"
         .format(cap_available_for_daemons, current_selected_queue_for_llap_cap, yarn_nm_mem_in_mb,
                 total_cluster_capacity,
-                total_llap_queue_size, normalized_tez_am_container_size, yarn_min_container_size, llap_concurrency,
+                normalized_selected_queue_for_llap_cap, normalized_tez_am_container_size, yarn_min_container_size, llap_concurrency,
                 total_am_capacity_required))
       if cap_available_for_daemons < yarn_min_container_size:
         raise Fail(
@@ -1409,19 +1431,19 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
                    "'hive.server2.tez.default.queues' property attributes.")
 
   """
-  Retrieves the passed in queue's 'capacity' from Capacity Scheduler.
+  Retrieves the passed in queue's 'capacity' related key from Capacity Scheduler.
   """
-  def __getQueueCapFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name):
+  def __getQueueCapacityKeyFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name):
     # Identify the key which contains the capacity for 'llap_daemon_selected_queue_name'.
     cap_sched_keys = capacity_scheduler_properties.keys()
     llap_selected_queue_cap_key =  None
     current_selected_queue_for_llap_cap = None
     for key in cap_sched_keys:
+      # Expected capacity prop key is of form : 'yarn.scheduler.capacity.<one or more queues in path separated by '.'>.[llap_daemon_selected_queue_name].capacity'
       if key.endswith(llap_daemon_selected_queue_name+".capacity"):
         llap_selected_queue_cap_key = key
         break;
-    current_selected_queue_for_llap_cap = capacity_scheduler_properties.get(llap_selected_queue_cap_key)
-    return current_selected_queue_for_llap_cap
+    return llap_selected_queue_cap_key
 
   """
   Retrieves the passed in queue's 'state' from Capacity Scheduler.
@@ -1438,6 +1460,37 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
     llap_selected_queue_state = capacity_scheduler_properties.get(llap_selected_queue_state_key)
     return llap_selected_queue_state
 
+  """
+  Calculates the total available capacity for the passed-in YARN queue of any level based on the percentages.
+  """
+  def __getSelectedQueueTotalCap(self, capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity):
+    Logger.info("Entered __getSelectedQueueTotalCap fn().")
+    available_capacity = total_cluster_capacity
+    queue_cap_key = self.__getQueueCapacityKeyFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
+    if queue_cap_key:
+      queue_cap_key = queue_cap_key.strip()
+      if len(queue_cap_key) >= 34: # len('yarn.scheduler.capacity.<single letter queue name>.capacity') = 34
+        # Expected capacity prop key is of form : 'yarn.scheduler.capacity.<one or more queues (path)>.capacity'
+        queue_path = queue_cap_key[24:] # Strip from beginning 'yarn.scheduler.capacity.'
+        queue_path = queue_path[0:-9] # Strip from end '.capacity'
+        queues_list = queue_path.split('.')
+        Logger.info("Queue list : {0}".format(queues_list))
+        if queues_list:
+          for queue in queues_list:
+            queue_cap_key = self.__getQueueCapacityKeyFromCapacityScheduler(capacity_scheduler_properties, queue)
+            queue_cap_perc = float(capacity_scheduler_properties.get(queue_cap_key))
+            available_capacity = queue_cap_perc / 100 * available_capacity
+            Logger.info("Total capacity available for queue {0} is : {1}".format(queue, available_capacity))
+        else:
+          raise Fail("Retrieved 'queue list' from capacity-scheduler is empty while doing '{0}' queue capacity caluclation."
+                     .format(llap_daemon_selected_queue_name))
+      else:
+        raise Fail("Expected length for queue_cap_key(val:{0}) should be greater than atleast 34.".format(queue_cap_key))
+    else:
+      raise Fail("Couldn't retrieve {0}' queue capacity KEY from capacity-scheduler while doing capacity caluclation.".format(llap_daemon_selected_queue_name))
+    # returns the capacity calculated for passed-in queue in 'llap_daemon_selected_queue_name'.
+    return available_capacity
+
   def recommendRangerKMSConfigurations(self, configurations, clusterData, services, hosts):
     super(HDP25StackAdvisor, self).recommendRangerKMSConfigurations(configurations, clusterData, services, hosts)
     servicesList = [service["StackServices"]["service_name"] for service in services["services"]]

+ 8 - 1
ambari-server/src/test/python/stacks/2.5/common/services-normal-his-2-hosts.json

@@ -1050,7 +1050,14 @@
     "hive-interactive-site" : {
       "properties": {
         "hive.llap.daemon.queue.name": "llap",
-        "hive.server2.enable.doAs": "true"
+        "hive.server2.enable.doAs": "true",
+        "hive.llap.daemon.num.executors": "0",
+        "hive.server2.tez.sessions.per.default.queue": "32"
+      }
+    },
+    "hive-interactive-env" : {
+      "properties": {
+        "num_llap_nodes": "0"
       }
     },
     "capacity-scheduler" : {

+ 9 - 8
ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py

@@ -556,13 +556,17 @@ class TestHDP25StackAdvisor(TestCase):
     # Expected : 1. Error telling about the current size compared to minimum required size.
     #            2. Error telling about current state can't be STOPPED. Expected : RUNNING.
     #            3. Error telling about config 'hive.server2.enable.doAs' to be false at all times.
+    #            4. Error telling about config 'hive.server2.tez.sessions.per.default.queue' that its consuming more
+    #               than 50% of queue capacity for LLAP.
     services2 = self.load_json("services-normal-his-2-hosts.json")
     res_expected2 = [
       {'config-type': 'hive-interactive-site', 'message': "Selected queue 'llap' capacity (49%) is less than minimum required "
                                                           "capacity (50%) for LLAP app to run", 'type': 'configuration', 'config-name': 'hive.llap.daemon.queue.name', 'level': 'ERROR'},
       {'config-type': 'hive-interactive-site', 'message': "Selected queue 'llap' current state is : 'STOPPED'. It is required to be in "
                                                           "'RUNNING' state for LLAP to run", 'type': 'configuration', 'config-name': 'hive.llap.daemon.queue.name', 'level': 'ERROR'},
-      {'config-type': 'hive-interactive-site', 'message': "Value should be set to 'false' for Hive2.", 'type': 'configuration', 'config-name': 'hive.server2.enable.doAs', 'level': 'ERROR'}
+      {'config-type': 'hive-interactive-site', 'message': "Value should be set to 'false' for Hive2.", 'type': 'configuration', 'config-name': 'hive.server2.enable.doAs', 'level': 'ERROR'},
+      {'config-type': 'hive-interactive-site', 'message': " Reducing the 'Maximum Total Concurrent Queries' (value: 32) is advisable as it is consuming more than 50% of 'llap' queue for "
+                                                          "LLAP.", 'type': 'configuration', 'config-name': 'hive.server2.tez.sessions.per.default.queue', 'level': 'WARN'}
     ]
     res2 = self.stackAdvisor.validateHiveInteractiveSiteConfigurations({}, {}, {}, services2, hosts)
     self.assertEquals(res2, res_expected2)
@@ -3746,7 +3750,6 @@ class TestHDP25StackAdvisor(TestCase):
     }
 
     self.stackAdvisor.recommendYARNConfigurations(configurations, clusterData, services, self.hosts)
-
     self.assertEqual(configurations['hive-interactive-env']['properties']['num_llap_nodes'], '1')
 
     self.assertEqual(configurations['hive-interactive-site']['properties']['hive.llap.daemon.yarn.container.mb'], '5115')
@@ -4726,7 +4729,6 @@ class TestHDP25StackAdvisor(TestCase):
 
 
     self.stackAdvisor.recommendYARNConfigurations(configurations, clusterData, services, self.hosts)
-
     self.assertEqual(configurations['hive-interactive-site']['properties']['hive.server2.tez.sessions.per.default.queue'], '15')
     self.assertEquals(configurations['hive-interactive-site']['property_attributes']['hive.server2.tez.sessions.per.default.queue'], {'minimum': '1', 'maximum': '32'})
 
@@ -5609,7 +5611,7 @@ class TestHDP25StackAdvisor(TestCase):
                                   "yarn.scheduler.capacity.root.default.a.llap.user-limit-factor=1\n"
                                   "yarn.scheduler.capacity.root.default.a.acl_administer_queue=*\n"
                                   "yarn.scheduler.capacity.root.default.a.acl_submit_applications=*\n"
-                                  "yarn.scheduler.capacity.root.default.a.capacity=0\n"
+                                  "yarn.scheduler.capacity.root.default.a.capacity=20\n"
                                   "yarn.scheduler.capacity.root.default.a.maximum-capacity=0\n"
                                   "yarn.scheduler.capacity.root.default.a.minimum-user-limit-percent=100\n"
                                   "yarn.scheduler.capacity.root.default.a.ordering-policy=fifo\n"
@@ -5619,7 +5621,7 @@ class TestHDP25StackAdvisor(TestCase):
                                   "yarn.scheduler.capacity.root.default.acl_submit_applications=*\n"
                                   "yarn.scheduler.capacity.root.default.b.acl_administer_queue=*\n"
                                   "yarn.scheduler.capacity.root.default.b.acl_submit_applications=*\n"
-                                  "yarn.scheduler.capacity.root.default.b.capacity=100\n"
+                                  "yarn.scheduler.capacity.root.default.b.capacity=80\n"
                                   "yarn.scheduler.capacity.root.default.b.maximum-capacity=100\n"
                                   "yarn.scheduler.capacity.root.default.b.minimum-user-limit-percent=100\n"
                                   "yarn.scheduler.capacity.root.default.b.ordering-policy=fifo\n"
@@ -5643,7 +5645,7 @@ class TestHDP25StackAdvisor(TestCase):
         "hive-interactive-site":
           {
             'properties': {
-              'hive.llap.daemon.queue.name': 'default.b',
+              'hive.llap.daemon.queue.name': 'b',
               'hive.server2.tez.sessions.per.default.queue': '1'
             }
           },
@@ -5689,8 +5691,7 @@ class TestHDP25StackAdvisor(TestCase):
     configurations = {
     }
     self.stackAdvisor.recommendYARNConfigurations(configurations, clusterData, services, self.hosts)
-
-    self.assertEqual(configurations['hive-interactive-env']['properties']['num_llap_nodes'], '3')
+    self.assertEqual(configurations['hive-interactive-env']['properties']['num_llap_nodes'], '2')
 
     self.assertEqual(configurations['hive-interactive-site']['properties']['hive.llap.daemon.yarn.container.mb'], '10240')