소스 검색

AMBARI-17702. Fix the stack_advisor so that LLAP can start when a nested queue is selected.

Swapan Shridhar 9 년 전
부모
커밋
b485fa3256

+ 11 - 4
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py

@@ -1633,10 +1633,17 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
           for subQueue in subQueues:
             toProcessQueues.append(queue + "." + subQueue)
         else:
-          # Leaf queue path
-          # eg: if queue depth for leaf queue 'd1' is like : 'yarn.scheduler.capacity.root.a1.b1.c1.d1',
-          # added leaf queue path is : 'a1.b1.c1.d1'
-          leafQueueNames.add(queue)
+          # Leaf queues
+          # We only take the leaf queue name instead of the complete path, as leaf queue names are unique in YARN.
+          # Eg: If YARN queues are like :
+          #     (1). 'yarn.scheduler.capacity.root.a1.b1.c1.d1',
+          #     (2). 'yarn.scheduler.capacity.root.a1.b1.c2',
+          #     (3). 'yarn.scheduler.capacity.root.default,
+          # Added leaf queues names are as : d1, c2 and default for the 3 leaf queues.
+          leafQueuePathSplits = queue.split(".")
+          if leafQueuePathSplits > 0:
+            leafQueueName = leafQueuePathSplits[-1]
+            leafQueueNames.add(leafQueueName)
     return leafQueueNames
 
 def getOldValue(self, services, configType, propertyName):

+ 36 - 7
ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py

@@ -181,7 +181,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
             'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
           curr_selected_queue_for_llap = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
           if curr_selected_queue_for_llap:
-            curr_selected_queue_for_llap_cap_perc = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+curr_selected_queue_for_llap+'.capacity')
+            curr_selected_queue_for_llap_cap_perc = self.__getQueueCapFromCapacityScheduler(capacity_scheduler_properties, curr_selected_queue_for_llap)
             if curr_selected_queue_for_llap_cap_perc:
               curr_selected_queue_for_llap_cap_perc = int(float(curr_selected_queue_for_llap_cap_perc))
               min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations)
@@ -197,7 +197,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
                "Hive Server Interactive.".format(curr_selected_queue_for_llap))
 
             # Validate that current selected queue in 'hive.llap.daemon.queue.name' state is not STOPPED.
-            llap_selected_queue_state = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+curr_selected_queue_for_llap+".state")
+            llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, curr_selected_queue_for_llap)
             if llap_selected_queue_state:
               if llap_selected_queue_state == "STOPPED":
                 errMsg2 =  "Selected queue '{0}' current state is : '{1}'. It is required to be in 'RUNNING' state for LLAP to run"\
@@ -564,7 +564,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
           Logger.info("Queue selected for LLAP app is : '{0}'. Current YARN queues : {1}. Setting '{2}' queue capacity slider "
                       "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames), llap_queue_name))
         if llap_daemon_selected_queue_name:
-          llap_selected_queue_state = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_daemon_selected_queue_name+".state")
+          llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
           if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED":
             putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false")
             raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default values "
@@ -612,11 +612,10 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
       if llap_queue_selected_in_current_call == llap_queue_name or llap_daemon_selected_queue_name == llap_queue_name:
         current_selected_queue_for_llap_cap = self.get_llap_cap_percent_slider(services, configurations)
       else:  # any queue other than 'llap'
-        current_selected_queue_for_llap_cap = capacity_scheduler_properties.get(
-          'yarn.scheduler.capacity.root.' + llap_daemon_selected_queue_name + '.capacity')
+        current_selected_queue_for_llap_cap = self.__getQueueCapFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
 
-      assert (current_selected_queue_for_llap_cap >= 1), "Current selected  current value : {0}. Expected value : >= 1" \
-        .format(current_selected_queue_for_llap_cap)
+      assert (current_selected_queue_for_llap_cap >= 1), "Current selected queue '{0}' capacity value : {1}. Expected value : >= 1" \
+        .format(llap_daemon_selected_queue_name, current_selected_queue_for_llap_cap)
 
       yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
       total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
@@ -1390,6 +1389,36 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
       Logger.error("Problem retrieving YARN queues. Skipping updating HIVE Server Interactve "
                    "'hive.server2.tez.default.queues' property attributes.")
 
+  """
+  Retrieves the passed in queue's 'capacity' from Capacity Scheduler.
+  """
+  def __getQueueCapFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name):
+    # Identify the key which contains the capacity for 'llap_daemon_selected_queue_name'.
+    cap_sched_keys = capacity_scheduler_properties.keys()
+    llap_selected_queue_cap_key =  None
+    current_selected_queue_for_llap_cap = None
+    for key in cap_sched_keys:
+      if key.endswith(llap_daemon_selected_queue_name+".capacity"):
+        llap_selected_queue_cap_key = key
+        break;
+    current_selected_queue_for_llap_cap = capacity_scheduler_properties.get(llap_selected_queue_cap_key)
+    return current_selected_queue_for_llap_cap
+
+  """
+  Retrieves the passed in queue's 'state' from Capacity Scheduler.
+  """
+  def __getQueueStateFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name):
+    # Identify the key which contains the state for 'llap_daemon_selected_queue_name'.
+    cap_sched_keys = capacity_scheduler_properties.keys()
+    llap_selected_queue_state_key =  None
+    llap_selected_queue_state = None
+    for key in cap_sched_keys:
+      if key.endswith(llap_daemon_selected_queue_name+".state"):
+        llap_selected_queue_state_key = key
+        break;
+    llap_selected_queue_state = capacity_scheduler_properties.get(llap_selected_queue_state_key)
+    return llap_selected_queue_state
+
   def recommendRangerKMSConfigurations(self, configurations, clusterData, services, hosts):
     super(HDP25StackAdvisor, self).recommendRangerKMSConfigurations(configurations, clusterData, services, hosts)
     servicesList = [service["StackServices"]["service_name"] for service in services["services"]]

+ 6 - 6
ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py

@@ -288,16 +288,16 @@ class TestHDP25StackAdvisor(TestCase):
           "hive.llap.daemon.queue.name": {
             "entries": [
               {
-                "value": "default.a.a1",
-                "label": "default.a.a1"
+                "value": "a1",
+                "label": "a1"
               },
               {
-                "value": "default.a.llap",
-                "label": "default.a.llap"
+                "value": "b",
+                "label": "b"
               },
               {
-                "value": "default.b",
-                "label": "default.b"
+                "value": "llap",
+                "label": "llap"
               }
             ]
           }