|
@@ -33,6 +33,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
self.HIVE_INTERACTIVE_SITE = 'hive-interactive-site'
|
|
|
self.YARN_ROOT_DEFAULT_QUEUE_NAME = 'default'
|
|
|
self.AMBARI_MANAGED_LLAP_QUEUE_NAME = 'llap'
|
|
|
+ self.CONFIG_VALUE_UINITIALIZED = 'SET_ON_FIRST_INVOCATION'
|
|
|
|
|
|
def recommendOozieConfigurations(self, configurations, clusterData, services, hosts):
|
|
|
super(HDP25StackAdvisor,self).recommendOozieConfigurations(configurations, clusterData, services, hosts)
|
|
@@ -352,7 +353,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
if num_tez_sessions:
|
|
|
num_tez_sessions = long(num_tez_sessions)
|
|
|
yarn_min_container_size = self.get_yarn_min_container_size(services, configurations)
|
|
|
- tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity))
|
|
|
+ tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity))
|
|
|
normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
|
|
|
llap_selected_queue_cap_remaining = current_selected_queue_for_llap_cap - (normalized_tez_am_container_size * num_tez_sessions)
|
|
|
if llap_selected_queue_cap_remaining <= current_selected_queue_for_llap_cap/2:
|
|
@@ -704,7 +705,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
# Update 'hive.llap.daemon.queue.name' property attributes if capacity scheduler is changed.
|
|
|
if self.HIVE_INTERACTIVE_SITE in services['configurations']:
|
|
|
if 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
|
|
|
- self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations)
|
|
|
+ self.setLlapDaemonQueuePropAttributes(services, configurations)
|
|
|
|
|
|
# Update 'hive.server2.tez.default.queues' value
|
|
|
hive_tez_default_queue = None
|
|
@@ -720,7 +721,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
Logger.info("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue))
|
|
|
else:
|
|
|
putHiveInteractiveEnvProperty('enable_hive_interactive', 'false')
|
|
|
- putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false")
|
|
|
+ putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false")
|
|
|
|
|
|
if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \
|
|
|
'hive.llap.zk.sm.connectionString' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
|
|
@@ -741,7 +742,6 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
|
|
|
# Hive Server interactive is already added or getting added
|
|
|
if enable_hive_interactive == 'true':
|
|
|
- self.checkAndManageLlapQueue(services, configurations, hosts, LLAP_QUEUE_NAME)
|
|
|
self.updateLlapConfigs(configurations, services, hosts, LLAP_QUEUE_NAME)
|
|
|
else: # When Hive Interactive Server is in 'off/removed' state.
|
|
|
self.checkAndStopLlapQueue(services, configurations, LLAP_QUEUE_NAME)
|
|
@@ -766,17 +766,22 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
|
|
|
"""
|
|
|
Entry point for updating Hive's 'LLAP app' configs namely : (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb
|
|
|
- (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb,
|
|
|
- and (7). hive.server2.tez.sessions.per.default.queue
|
|
|
+ (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb,
|
|
|
+ (7). hive.server2.tez.sessions.per.default.queue, (8). tez.am.resource.memory.mb (9). hive.tez.container.size
|
|
|
+ (10). tez.runtime.io.sort.mb (11). tez.runtime.unordered.output.buffer.size-mb (12). hive.llap.io.threadpool.size, and
|
|
|
+ (13). hive.llap.io.enabled.
|
|
|
|
|
|
The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following:
|
|
|
- (1). 'enable_hive_interactive' set to 'true' (2). 'llap_queue_capacity' (3). 'hive.server2.tez.sessions.per.default.queue'
|
|
|
+ (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue'
|
|
|
(4). Change in queue selection for config 'hive.llap.daemon.queue.name'.
|
|
|
|
|
|
- If change in value for 'llap_queue_capacity' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config
|
|
|
+ If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config
|
|
|
value is not calulated, but read and use in calculation for dependent configs.
|
|
|
+
|
|
|
+ Note: All memory caluclations are in MB, unless specified otherwise.
|
|
|
"""
|
|
|
def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name):
|
|
|
+ Logger.info("Entered updateLlapConfigs() ..")
|
|
|
putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services)
|
|
|
putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE)
|
|
|
|
|
@@ -786,11 +791,16 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
putTezInteractiveSiteProperty = self.putProperty(configurations, "tez-interactive-site", services)
|
|
|
|
|
|
llap_daemon_selected_queue_name = None
|
|
|
- llap_queue_selected_in_current_call = None
|
|
|
- LLAP_MAX_CONCURRENCY = 32 # Allow a max of 32 concurrency.
|
|
|
+ selected_queue_is_ambari_managed_llap = None # Queue named 'llap' at root level is Ambari managed.
|
|
|
+ llap_selected_queue_am_percent = None
|
|
|
+ DEFAULT_EXECUTOR_TO_AM_RATIO = 20
|
|
|
+ MIN_EXECUTOR_TO_AM_RATIO = 10
|
|
|
+ MAX_CONCURRENT_QUERIES = 32
|
|
|
+ leafQueueNames = None
|
|
|
+ MB_TO_BYTES = 1048576
|
|
|
|
|
|
- # Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility.
|
|
|
- self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations)
|
|
|
+ # Update 'hive.llap.daemon.queue.name' prop combo entries
|
|
|
+ self.setLlapDaemonQueuePropAttributes(services, configurations)
|
|
|
|
|
|
if not services["changed-configurations"]:
|
|
|
read_llap_daemon_yarn_cont_mb = long(self.get_yarn_min_container_size(services, configurations))
|
|
@@ -804,33 +814,58 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
|
|
|
llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
|
|
|
|
|
|
- if 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']:
|
|
|
- llap_queue_selected_in_current_call = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
|
|
|
-
|
|
|
- # Update Visibility of 'llap_queue_capacity' slider.
|
|
|
+ # Update Visibility of 'num_llap_nodes' slider. Visible only if selected queue is Ambari created 'llap'.
|
|
|
capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
|
|
|
if capacity_scheduler_properties:
|
|
|
# Get all leaf queues.
|
|
|
leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
|
|
|
- if len(leafQueueNames) == 2 and \
|
|
|
- (llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == llap_queue_name) or \
|
|
|
- (llap_queue_selected_in_current_call != None and llap_queue_selected_in_current_call == llap_queue_name):
|
|
|
- putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "true")
|
|
|
- Logger.info("Selected YARN queue is '{0}'. Setting LLAP queue capacity slider visibility to 'True'".format(llap_queue_name))
|
|
|
- else:
|
|
|
- putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false")
|
|
|
- Logger.info("Queue selected for LLAP app is : '{0}'. Current YARN queues : {1}. Setting '{2}' queue capacity slider "
|
|
|
- "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames), llap_queue_name))
|
|
|
- if llap_daemon_selected_queue_name:
|
|
|
- llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
|
|
|
- if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED":
|
|
|
- putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false")
|
|
|
- raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default values "
|
|
|
- "and 'llap' queue capacity slider visibility to 'False'."
|
|
|
- .format(llap_daemon_selected_queue_name, llap_selected_queue_state))
|
|
|
+ Logger.info("YARN leaf Queues = {0}".format(leafQueueNames))
|
|
|
+ if len(leafQueueNames) == 0:
|
|
|
+ raise Fail("Queue(s) couldn't be retrieved from capacity-scheduler.")
|
|
|
+
|
|
|
+ # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive).
|
|
|
+ changed_configs_has_enable_hive_int = self.are_config_props_in_changed_configs(services, "hive-interactive-env",
|
|
|
+ set(['enable_hive_interactive']), False)
|
|
|
+ llap_named_queue_selected_in_curr_invocation = None
|
|
|
+ if changed_configs_has_enable_hive_int \
|
|
|
+ and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']:
|
|
|
+ if (len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames)):
|
|
|
+ llap_named_queue_selected_in_curr_invocation = True
|
|
|
+ putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name)
|
|
|
+ putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name)
|
|
|
+ Logger.info("'hive.llap.daemon.queue.name' and 'hive.server2.tez.default.queues' values set as : {0}".format(llap_queue_name))
|
|
|
+ else:
|
|
|
+ first_leaf_queue = list(leafQueueNames)[0] # 1st invocation, pick the 1st leaf queue and set it as selected.
|
|
|
+ putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', first_leaf_queue)
|
|
|
+ putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', first_leaf_queue)
|
|
|
+ llap_named_queue_selected_in_curr_invocation = False
|
|
|
+ Logger.info("'hive.llap.daemon.queue.name' and 'hive.server2.tez.default.queues' values set as : {0}".format(first_leaf_queue))
|
|
|
+ Logger.info("llap_named_queue_selected_in_curr_invocation = {0}".format(llap_named_queue_selected_in_curr_invocation))
|
|
|
+
|
|
|
+ if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == llap_queue_name) or \
|
|
|
+ llap_named_queue_selected_in_curr_invocation) or \
|
|
|
+ (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation):
|
|
|
+ putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true")
|
|
|
+ Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
|
|
|
+ "slider visibility to 'True'".format(llap_queue_name, list(leafQueueNames)))
|
|
|
+ selected_queue_is_ambari_managed_llap = True
|
|
|
else:
|
|
|
- raise Fail("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values."
|
|
|
- .format(llap_daemon_selected_queue_name))
|
|
|
+ putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false")
|
|
|
+ Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
|
|
|
+ "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames)))
|
|
|
+ selected_queue_is_ambari_managed_llap = False
|
|
|
+
|
|
|
+ if not llap_named_queue_selected_in_curr_invocation: # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have
|
|
|
+ # state information pertaining to 'llap' queue.
|
|
|
+ # Check: State of the selected queue should not be STOPPED.
|
|
|
+ if llap_daemon_selected_queue_name:
|
|
|
+ llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
|
|
|
+ if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED":
|
|
|
+ raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default "
|
|
|
+ "values.".format(llap_daemon_selected_queue_name, llap_selected_queue_state))
|
|
|
+ else:
|
|
|
+ raise Fail("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values."
|
|
|
+ .format(llap_daemon_selected_queue_name))
|
|
|
else:
|
|
|
Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive."
|
|
|
" Not calculating LLAP configs.")
|
|
@@ -840,12 +875,12 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
llap_concurrency_in_changed_configs = None
|
|
|
llap_daemon_queue_in_changed_configs = None
|
|
|
# Calculations are triggered only if there is change in any one of the following props :
|
|
|
- # 'llap_queue_capacity', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue'
|
|
|
+ # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue'
|
|
|
# or 'hive.llap.daemon.queue.name' has change in value selection.
|
|
|
# OR
|
|
|
# services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation)
|
|
|
if 'changed-configurations' in services.keys():
|
|
|
- config_names_to_be_checked = set(['llap_queue_capacity', 'enable_hive_interactive'])
|
|
|
+ config_names_to_be_checked = set(['num_llap_nodes', 'enable_hive_interactive'])
|
|
|
changed_configs_in_hive_int_env = self.are_config_props_in_changed_configs(services, "hive-interactive-env",
|
|
|
config_names_to_be_checked, False)
|
|
|
|
|
@@ -863,182 +898,284 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
Logger.info("Current 'changed-configuration' received is : {0}".format(services["changed-configurations"]))
|
|
|
return
|
|
|
|
|
|
+ Logger.info("\nPerforming LLAP config calculations ......")
|
|
|
node_manager_host_list = self.get_node_manager_hosts(services, hosts)
|
|
|
node_manager_cnt = len(node_manager_host_list)
|
|
|
yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
|
|
|
total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
|
|
|
- Logger.info("\n\nCalculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, "
|
|
|
+ Logger.info("Calculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, "
|
|
|
"yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb))
|
|
|
|
|
|
- # Check which queue is selected in 'hive.llap.daemon.queue.name', to determine current queue capacity
|
|
|
- current_selected_queue_for_llap_cap = None
|
|
|
- yarn_root_queues = capacity_scheduler_properties.get("yarn.scheduler.capacity.root.queues")
|
|
|
- if llap_queue_selected_in_current_call == llap_queue_name \
|
|
|
- or llap_daemon_selected_queue_name == llap_queue_name \
|
|
|
- and (llap_queue_name in yarn_root_queues and len(leafQueueNames) == 2):
|
|
|
- current_selected_queue_for_llap_cap_perc = self.get_llap_cap_percent_slider(services, configurations)
|
|
|
- current_selected_queue_for_llap_cap = current_selected_queue_for_llap_cap_perc / 100 * total_cluster_capacity
|
|
|
- else: # any queue other than 'llap'
|
|
|
- current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties,
|
|
|
- llap_daemon_selected_queue_name, total_cluster_capacity)
|
|
|
- assert (current_selected_queue_for_llap_cap >= 1), "Current selected queue '{0}' capacity value : {1}. Expected value : >= 1" \
|
|
|
- .format(llap_daemon_selected_queue_name, current_selected_queue_for_llap_cap)
|
|
|
yarn_min_container_size = self.get_yarn_min_container_size(services, configurations)
|
|
|
- tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity))
|
|
|
+
|
|
|
+ tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity))
|
|
|
normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
|
|
|
+ cpu_per_nm_host = self.get_cpu_per_nm_host(services)
|
|
|
Logger.info("Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, "
|
|
|
"total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size,
|
|
|
total_cluster_capacity))
|
|
|
- normalized_selected_queue_for_llap_cap = long(self._normalizeDown(current_selected_queue_for_llap_cap, yarn_min_container_size))
|
|
|
+
|
|
|
+ # Calculate the available memory for LLAP app
|
|
|
+ yarn_nm_mem_in_mb_normalized = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size)
|
|
|
+ mem_per_thread_for_llap = self.calculate_mem_per_thread_for_llap(services, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host)
|
|
|
+ Logger.info("Calculated mem_per_thread_for_llap : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, "
|
|
|
+ "cpu_per_nm_host : {2}".format(mem_per_thread_for_llap, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host))
|
|
|
+
|
|
|
+ Logger.info("selected_queue_is_ambari_managed_llap = {0}".format(selected_queue_is_ambari_managed_llap))
|
|
|
+ if not selected_queue_is_ambari_managed_llap:
|
|
|
+ llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity)
|
|
|
+ assert(llap_daemon_selected_queue_cap > 0, "'{0}' queue capacity percentage retrieved = {1}. "
|
|
|
+ "Expected > 0.".format(llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap))
|
|
|
+ total_llap_mem_normalized = self._normalizeDown(llap_daemon_selected_queue_cap, yarn_min_container_size)
|
|
|
+ Logger.info("Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, "
|
|
|
+ "yarn_min_container_size : {3}".format(llap_daemon_selected_queue_name, total_llap_mem_normalized,
|
|
|
+ llap_daemon_selected_queue_cap, yarn_min_container_size))
|
|
|
+ num_llap_nodes_requested = math.floor(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized)
|
|
|
+ Logger.info("Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, "
|
|
|
+ "yarn_nm_mem_in_mb_normalized : {2}".format(num_llap_nodes_requested, total_llap_mem_normalized, yarn_nm_mem_in_mb_normalized))
|
|
|
+ queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name))
|
|
|
+ hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized
|
|
|
+ Logger.info("Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, "
|
|
|
+ "total_llap_mem_normalized : {2}".format(hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized))
|
|
|
+ else: # Ambari managed 'llap' named queue at root level.
|
|
|
+ num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input
|
|
|
+ total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized
|
|
|
+ Logger.info("Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, "
|
|
|
+ "yarn_nm_mem_in_mb_normalized : {2}".format(total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized))
|
|
|
+ total_llap_mem_normalized = float(self._normalizeDown(total_llap_mem, yarn_min_container_size))
|
|
|
+ Logger.info("Calculated 'total_llap_mem_normalized' : {0}, using following: total_llap_mem : {1}, "
|
|
|
+ "yarn_min_container_size : {2}".format(total_llap_mem_normalized, total_llap_mem, yarn_min_container_size))
|
|
|
+ # What percent is 'total_llap_mem' of 'total_cluster_capacity' ?
|
|
|
+ llap_named_queue_cap_fraction = math.ceil(total_llap_mem_normalized / total_cluster_capacity * 100)
|
|
|
+ assert(llap_named_queue_cap_fraction <= 100), "Calculated '{0}' queue size = {1}. Cannot be > 100.".format(llap_queue_name, llap_named_queue_cap_fraction)
|
|
|
+ Logger.info("Calculated '{0}' queue capacity percent = {1}.".format(llap_queue_name, llap_named_queue_cap_fraction))
|
|
|
+ # Adjust capacity scheduler for the 'llap' named queue.
|
|
|
+ self.checkAndManageLlapQueue(services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction)
|
|
|
+ hive_tez_am_cap_available = total_llap_mem_normalized
|
|
|
+ Logger.info("hive_tez_am_cap_available : {0}".format(hive_tez_am_cap_available))
|
|
|
+
|
|
|
+ #Common calculations now, irrespective of the queue selected.
|
|
|
|
|
|
# Get calculated value for Slider AM container Size
|
|
|
slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size),
|
|
|
yarn_min_container_size)
|
|
|
+ Logger.info("Calculated 'slider_am_container_size' : {0}, using following: yarn_min_container_size : "
|
|
|
+ "{1}".format(slider_am_container_size, yarn_min_container_size))
|
|
|
+
|
|
|
+ llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size
|
|
|
+ assert (llap_mem_for_tezAm_and_daemons >= 2 * yarn_min_container_size), "Not enough capacity available on the cluster to run LLAP"
|
|
|
+ Logger.info("Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, "
|
|
|
+ "slider_am_container_size : {2}".format(llap_mem_for_tezAm_and_daemons, total_llap_mem_normalized, slider_am_container_size))
|
|
|
+
|
|
|
+
|
|
|
+ # Calculate llap concurrency (i.e. Number of Tez AM's)
|
|
|
+ max_executors_per_node = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
|
|
|
|
|
|
# Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it.
|
|
|
if not llap_concurrency_in_changed_configs:
|
|
|
- # Calculate llap concurrency (i.e. Number of Tez AM's)
|
|
|
- llap_concurrency = float(normalized_selected_queue_for_llap_cap * 0.25 / normalized_tez_am_container_size)
|
|
|
- llap_concurrency = max(long(llap_concurrency), 1)
|
|
|
- Logger.info("Calculated llap_concurrency : {0}, using following : normalized_selected_queue_for_llap_cap : {1}, "
|
|
|
- "normalized_tez_am_container_size : {2}".format(llap_concurrency, normalized_selected_queue_for_llap_cap,
|
|
|
- normalized_tez_am_container_size))
|
|
|
- # Limit 'llap_concurrency' to reach a max. of 32.
|
|
|
- if llap_concurrency > LLAP_MAX_CONCURRENCY:
|
|
|
- llap_concurrency = LLAP_MAX_CONCURRENCY
|
|
|
+ assert(max_executors_per_node > 0), "Calculated 'max_executors_per_node' = {1}. Expected value >= 1.".format(max_executors_per_node)
|
|
|
+ Logger.info("Calculated 'max_executors_per_node' : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
|
|
|
+ "mem_per_thread_for_llap: {3}".format(max_executors_per_node, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap))
|
|
|
+ # Default 1 AM for every 20 executor threads.
|
|
|
+ # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM,
|
|
|
+ # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is
|
|
|
+ # instead limited by #CPUs. Use maxPerNode to factor this in.
|
|
|
+ llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
|
|
|
+ Logger.info("Calculated 'llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested : {2}, DEFAULT_EXECUTOR_TO_AM_RATIO "
|
|
|
+ ": {3}, MAX_CONCURRENT_QUERIES : {4}".format(llap_concurreny_limit, max_executors_per_node, num_llap_nodes_requested, DEFAULT_EXECUTOR_TO_AM_RATIO, MAX_CONCURRENT_QUERIES))
|
|
|
+ llap_concurrency = min(llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap + normalized_tez_am_container_size)))
|
|
|
+ Logger.info("Calculated 'llap_concurrency' : {0}, using following : llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
|
|
|
+ "{2}, DEFAULT_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
|
|
|
+ "{5}".format(llap_concurrency, llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, DEFAULT_EXECUTOR_TO_AM_RATIO,
|
|
|
+ mem_per_thread_for_llap, normalized_tez_am_container_size))
|
|
|
+ if (llap_concurrency == 0):
|
|
|
+ llap_concurrency = 1
|
|
|
+ Logger.info("Adjusted 'llap_concurrency' : 1.")
|
|
|
+
|
|
|
+ if (llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available):
|
|
|
+ llap_concurrency = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
|
|
|
+ assert(llap_concurrency > 0), "Calculated 'LLAP Concurrent Queries' = {0}. Expected value >= 1.".format(llap_concurrency)
|
|
|
+ Logger.info("Adjusted 'llap_concurrency' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
|
|
|
+ "{2}".format(llap_concurrency, hive_tez_am_cap_available, normalized_tez_am_container_size))
|
|
|
else:
|
|
|
# Read current value
|
|
|
if 'hive.server2.tez.sessions.per.default.queue' in services['configurations'][self.HIVE_INTERACTIVE_SITE][
|
|
|
'properties']:
|
|
|
llap_concurrency = long(services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties'][
|
|
|
'hive.server2.tez.sessions.per.default.queue'])
|
|
|
- assert (
|
|
|
- llap_concurrency >= 1), "'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1" \
|
|
|
+ assert (llap_concurrency >= 1), "'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1" \
|
|
|
.format(llap_concurrency)
|
|
|
+ Logger.info("Read 'llap_concurrency' : {0}".format(llap_concurrency ))
|
|
|
else:
|
|
|
raise Fail(
|
|
|
"Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config.")
|
|
|
|
|
|
-
|
|
|
- # Calculate 'total memory available for llap daemons' across cluster
|
|
|
- total_am_capacity_required = normalized_tez_am_container_size * llap_concurrency + slider_am_container_size
|
|
|
- cap_available_for_daemons = normalized_selected_queue_for_llap_cap - total_am_capacity_required
|
|
|
- Logger.info(
|
|
|
- "Calculated cap_available_for_daemons : {0}, using following : current_selected_queue_for_llap_cap : {1}, "
|
|
|
- "yarn_nm_mem_in_mb : {2}, total_cluster_capacity : {3}, normalized_selected_queue_for_llap_cap : {4}, normalized_tez_am_container_size"
|
|
|
- " : {5}, yarn_min_container_size : {6}, llap_concurrency : {7}, total_am_capacity_required : {8}"
|
|
|
- .format(cap_available_for_daemons, current_selected_queue_for_llap_cap, yarn_nm_mem_in_mb,
|
|
|
- total_cluster_capacity,
|
|
|
- normalized_selected_queue_for_llap_cap, normalized_tez_am_container_size, yarn_min_container_size, llap_concurrency,
|
|
|
- total_am_capacity_required))
|
|
|
- if cap_available_for_daemons < yarn_min_container_size:
|
|
|
- raise Fail(
|
|
|
- "'Capacity available for LLAP daemons'({0}) < 'YARN minimum container size'({1}). Invalid configuration detected. "
|
|
|
- "Increase LLAP queue size.".format(cap_available_for_daemons, yarn_min_container_size))
|
|
|
-
|
|
|
+ # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated.
|
|
|
+ max_llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
|
|
|
+ Logger.info("Calculated 'max_llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested "
|
|
|
+ ": {2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, MAX_CONCURRENT_QUERIES : {4}".format(max_llap_concurreny_limit, max_executors_per_node,
|
|
|
+ num_llap_nodes_requested, MIN_EXECUTOR_TO_AM_RATIO,
|
|
|
+ MAX_CONCURRENT_QUERIES))
|
|
|
+ max_llap_concurreny = min(max_llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (MIN_EXECUTOR_TO_AM_RATIO *
|
|
|
+ mem_per_thread_for_llap + normalized_tez_am_container_size)))
|
|
|
+ Logger.info("Calculated 'max_llap_concurreny' : {0}, using following : max_llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
|
|
|
+ "{2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
|
|
|
+ "{5}".format(max_llap_concurreny, max_llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, MIN_EXECUTOR_TO_AM_RATIO,
|
|
|
+ mem_per_thread_for_llap, normalized_tez_am_container_size))
|
|
|
+ if (max_llap_concurreny == 0):
|
|
|
+ max_llap_concurreny = 1
|
|
|
+ Logger.info("Adjusted 'max_llap_concurreny' : 1.")
|
|
|
+
|
|
|
+ if (max_llap_concurreny * normalized_tez_am_container_size > hive_tez_am_cap_available):
|
|
|
+ max_llap_concurreny = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
|
|
|
+ assert(max_llap_concurreny > 0), "Calculated 'Max. LLAP Concurrent Queries' = {0}. Expected value > 1".format(max_llap_concurreny)
|
|
|
+ Logger.info("Adjusted 'max_llap_concurreny' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
|
|
|
+ "{2}".format(max_llap_concurreny, hive_tez_am_cap_available, normalized_tez_am_container_size))
|
|
|
|
|
|
|
|
|
# Calculate value for 'num_llap_nodes', an across cluster config.
|
|
|
- # Also, get calculated value for 'hive.llap.daemon.yarn.container.mb' based on 'num_llap_nodes' value, a per node config.
|
|
|
- num_llap_nodes_raw = cap_available_for_daemons / yarn_nm_mem_in_mb
|
|
|
- if num_llap_nodes_raw < 1.00:
|
|
|
- # Set the llap nodes to min. value of 1 and 'llap_container_size' to min. YARN allocation.
|
|
|
- num_llap_nodes = 1
|
|
|
- llap_container_size = self._normalizeUp(cap_available_for_daemons, yarn_min_container_size)
|
|
|
- Logger.info("Calculated llap_container_size : {0}, using following : cap_available_for_daemons : {1}, "
|
|
|
- "yarn_min_container_size : {2}".format(llap_container_size, cap_available_for_daemons,
|
|
|
- yarn_min_container_size))
|
|
|
- else:
|
|
|
- num_llap_nodes = math.floor(num_llap_nodes_raw)
|
|
|
- llap_container_size = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size)
|
|
|
- Logger.info("Calculated llap_container_size : {0}, using following : yarn_nm_mem_in_mb : {1}, "
|
|
|
- "yarn_min_container_size : {2}".format(llap_container_size, yarn_nm_mem_in_mb,
|
|
|
- yarn_min_container_size))
|
|
|
- Logger.info(
|
|
|
- "Calculated num_llap_nodes : {0} using following : yarn_nm_mem_in_mb : {1}, cap_available_for_daemons : {2} " \
|
|
|
- .format(num_llap_nodes, yarn_nm_mem_in_mb, cap_available_for_daemons))
|
|
|
-
|
|
|
-
|
|
|
- # Calculate value for 'hive.llap.daemon.num.executors', a per node config.
|
|
|
- hive_tez_container_size = self.get_hive_tez_container_size(services, configurations)
|
|
|
- if 'yarn.nodemanager.resource.cpu-vcores' in services['configurations']['yarn-site']['properties']:
|
|
|
- cpu_per_nm_host = float(services['configurations']['yarn-site']['properties'][
|
|
|
- 'yarn.nodemanager.resource.cpu-vcores'])
|
|
|
- assert (cpu_per_nm_host > 0), "'yarn.nodemanager.resource.cpu-vcores' current value : {0}. Expected value : > 0" \
|
|
|
- .format(cpu_per_nm_host)
|
|
|
+ tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size
|
|
|
+ Logger.info("Calculated 'tez_am_memory_required' : {0}, using following : llap_concurrency : {1}, normalized_tez_am_container_size : "
|
|
|
+ "{2}".format(tez_am_memory_required, llap_concurrency, normalized_tez_am_container_size))
|
|
|
+ llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required
|
|
|
+ assert (llap_mem_daemon_size >= yarn_min_container_size), "Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container " \
|
|
|
+ "Size' ({1})'".format(llap_mem_daemon_size, yarn_min_container_size)
|
|
|
+ assert(llap_mem_daemon_size >= mem_per_thread_for_llap or llap_mem_daemon_size >= yarn_min_container_size), "Not enough memory available for executors."
|
|
|
+ Logger.info("Calculated 'llap_mem_daemon_size' : {0}, using following : llap_mem_for_tezAm_and_daemons : {1}, tez_am_memory_required : "
|
|
|
+ "{2}".format(llap_mem_daemon_size, llap_mem_for_tezAm_and_daemons, tez_am_memory_required))
|
|
|
+
|
|
|
+ llap_daemon_mem_per_node = self._normalizeDown(llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size)
|
|
|
+ Logger.info("Calculated 'llap_daemon_mem_per_node' : {0}, using following : llap_mem_daemon_size : {1}, num_llap_nodes_requested : {2}, "
|
|
|
+ "yarn_min_container_size: {3}".format(llap_daemon_mem_per_node, llap_mem_daemon_size, num_llap_nodes_requested, yarn_min_container_size))
|
|
|
+ if (llap_daemon_mem_per_node == 0):
|
|
|
+ # Small cluster. No capacity left on a node after running AMs.
|
|
|
+ llap_daemon_mem_per_node = mem_per_thread_for_llap
|
|
|
+ num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap)
|
|
|
+ Logger.info("'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, "
|
|
|
+ "mem_per_thread_for_llap : {3}".format(llap_daemon_mem_per_node, num_llap_nodes, llap_mem_daemon_size, mem_per_thread_for_llap))
|
|
|
+ elif (llap_daemon_mem_per_node < mem_per_thread_for_llap):
|
|
|
+ # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node)
|
|
|
+ llap_daemon_mem_per_node = mem_per_thread_for_llap
|
|
|
+ num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap)
|
|
|
+ Logger.info("'llap_daemon_mem_per_node'({0}) < mem_per_thread_for_llap({1}), adjusted 'llap_daemon_mem_per_node' "
|
|
|
+ ": {2}".format(llap_daemon_mem_per_node, mem_per_thread_for_llap, llap_daemon_mem_per_node))
|
|
|
else:
|
|
|
- raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.cpu-vcores' config.")
|
|
|
-
|
|
|
- num_executors_per_node_raw = math.floor(llap_container_size / hive_tez_container_size)
|
|
|
- num_executors_per_node = min(num_executors_per_node_raw, cpu_per_nm_host)
|
|
|
- Logger.info("calculated num_executors_per_node: {0}, using following : hive_tez_container_size : {1}, "
|
|
|
- "cpu_per_nm_host : {2}, num_executors_per_node_raw : {3}, llap_container_size : {4}"
|
|
|
- .format(num_executors_per_node, hive_tez_container_size, cpu_per_nm_host, num_executors_per_node_raw,
|
|
|
- llap_container_size))
|
|
|
- assert (num_executors_per_node >= 0), "'Number of executors per node' : {0}. Expected value : > 0".format(
|
|
|
- num_executors_per_node)
|
|
|
-
|
|
|
- total_mem_for_executors = num_executors_per_node * hive_tez_container_size
|
|
|
-
|
|
|
- # Calculate value for 'cache' (hive.llap.io.memory.size), a per node config.
|
|
|
- cache_size_per_node = llap_container_size - total_mem_for_executors
|
|
|
- Logger.info(
|
|
|
- "Calculated cache_size_per_node : {0} using following : hive_container_size : {1}, llap_container_size"
|
|
|
- " : {2}, num_executors_per_node : {3}"
|
|
|
- .format(cache_size_per_node, hive_tez_container_size, llap_container_size, num_executors_per_node))
|
|
|
- if cache_size_per_node < 0: # Run with '0' cache.
|
|
|
- Logger.info(
|
|
|
- "Calculated 'cache_size_per_node' : {0}. Setting 'cache_size_per_node' to 0.".format(cache_size_per_node))
|
|
|
- cache_size_per_node = 0
|
|
|
-
|
|
|
+ # All good. We have a proper value for memoryPerNode.
|
|
|
+ num_llap_nodes = num_llap_nodes_requested
|
|
|
+ Logger.info("num_llap_nodes : {0}".format(num_llap_nodes))
|
|
|
+
|
|
|
+ num_executors_per_node_max = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
|
|
|
+ assert(num_executors_per_node_max >= 1), "Calculated 'Max. Executors per Node' = {0}. Expected values >= 1.".format(num_executors_per_node_max)
|
|
|
+ Logger.info("Calculated 'num_executors_per_node_max' : {0}, using following : yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
|
|
|
+ "mem_per_thread_for_llap: {3}".format(num_executors_per_node_max, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap))
|
|
|
+
|
|
|
+ # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem.
|
|
|
+ num_executors_per_node = min(math.floor(llap_daemon_mem_per_node / mem_per_thread_for_llap), num_executors_per_node_max)
|
|
|
+ assert(num_executors_per_node > 0), "Calculated 'Number of Executors Per Node' = {0}. Expected value >= 1".format(num_executors_per_node)
|
|
|
+ Logger.info("Calculated 'num_executors_per_node' : {0}, using following : llap_daemon_mem_per_node : {1}, num_executors_per_node_max : {2}, "
|
|
|
+ "mem_per_thread_for_llap: {3}".format(num_executors_per_node, llap_daemon_mem_per_node, num_executors_per_node_max, mem_per_thread_for_llap))
|
|
|
+
|
|
|
+ # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache.
|
|
|
+ total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap
|
|
|
+ cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node
|
|
|
+
|
|
|
+ tez_runtime_io_sort_mb = ((long)((0.8 * mem_per_thread_for_llap) / 3))
|
|
|
+ tez_runtime_unordered_output_buffer_size = long(0.8 * 0.075 * mem_per_thread_for_llap)
|
|
|
+ # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576.
|
|
|
+ hive_auto_convert_join_noconditionaltask_size = ((long)((0.8 * mem_per_thread_for_llap) / 3)) * MB_TO_BYTES
|
|
|
|
|
|
# Calculate value for prop 'llap_heap_size'
|
|
|
- llap_xmx = max(total_mem_for_executors * 0.8, total_mem_for_executors - self.get_llap_headroom_space(services, configurations))
|
|
|
- Logger.info("Calculated llap_app_heap_size : {0}, using following : hive_container_size : {1}, "
|
|
|
- "total_mem_for_executors : {2}".format(llap_xmx, hive_tez_container_size, total_mem_for_executors))
|
|
|
+ llap_xmx = max(total_mem_for_executors_per_node * 0.8, total_mem_for_executors_per_node - self.get_llap_headroom_space(services, configurations))
|
|
|
+ Logger.info("Calculated llap_app_heap_size : {0}, using following : total_mem_for_executors : {1}".format(llap_xmx, total_mem_for_executors_per_node))
|
|
|
+
|
|
|
+ # Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI)
|
|
|
+ hive_server_interactive_heapsize = None
|
|
|
+ hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts)
|
|
|
+ if hive_server_interactive_hosts is None:
|
|
|
+ # If its None, read the base service HDFS's DATANODE node memory, as are host are considered homogenous.
|
|
|
+ hive_server_interactive_hosts = self.getHostsWithComponent("HDFS", "DATANODE", services, hosts)
|
|
|
+ if hive_server_interactive_hosts is not None and len(hive_server_interactive_hosts) > 0:
|
|
|
+ host_mem = long(hive_server_interactive_hosts[0]["Hosts"]["total_mem"])
|
|
|
+ hive_server_interactive_heapsize = min(max(2048.0, 400.0*llap_concurrency), 3.0/8 * host_mem)
|
|
|
+ Logger.info("Calculated 'hive_server_interactive_heapsize' : {0}, using following : llap_concurrency : {1}, host_mem : "
|
|
|
+ "{2}".format(hive_server_interactive_heapsize, llap_concurrency, host_mem))
|
|
|
|
|
|
|
|
|
- # Updating calculated configs.
|
|
|
+ Logger.info("Updating the calculations....")
|
|
|
+
|
|
|
+ # Done with calculations, updating calculated configs.
|
|
|
+
|
|
|
normalized_tez_am_container_size = long(normalized_tez_am_container_size)
|
|
|
putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size)
|
|
|
- Logger.info("'Tez for Hive2' config 'tez.am.resource.memory.mb' updated. Current: {0}".format(
|
|
|
- normalized_tez_am_container_size))
|
|
|
+ Logger.info("'Tez for Hive2' config 'tez.am.resource.memory.mb' updated. Current: {0}".format(normalized_tez_am_container_size))
|
|
|
|
|
|
if not llap_concurrency_in_changed_configs:
|
|
|
min_llap_concurrency = 1
|
|
|
putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency)
|
|
|
putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum",
|
|
|
min_llap_concurrency)
|
|
|
- putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum",
|
|
|
- LLAP_MAX_CONCURRENCY)
|
|
|
- Logger.info(
|
|
|
- "Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Min : {0}, Current: {1}, Max: {2}" \
|
|
|
- .format(min_llap_concurrency, llap_concurrency, LLAP_MAX_CONCURRENCY))
|
|
|
|
|
|
- num_llap_nodes = long(num_llap_nodes)
|
|
|
+ Logger.info("Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Min : {0}, Current: {1}" \
|
|
|
+ .format(min_llap_concurrency, llap_concurrency))
|
|
|
|
|
|
- putHiveInteractiveEnvProperty('num_llap_nodes', num_llap_nodes)
|
|
|
- Logger.info("LLAP config 'num_llap_nodes' updated. Current: {0}".format(num_llap_nodes))
|
|
|
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny)
|
|
|
+ Logger.info("Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Max : {0}".format(max_llap_concurreny))
|
|
|
|
|
|
- llap_container_size = long(llap_container_size)
|
|
|
+ num_llap_nodes = long(num_llap_nodes)
|
|
|
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
|
|
|
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
|
|
|
+ if (num_llap_nodes != num_llap_nodes_requested):
|
|
|
+ Logger.info("User requested num_llap_nodes : {0}, but used/adjusted value for calculations is : {1}".format(num_llap_nodes_requested, num_llap_nodes))
|
|
|
+ else:
|
|
|
+ Logger.info("Used num_llap_nodes for calculations : {0}".format(num_llap_nodes_requested))
|
|
|
+ Logger.info("LLAP config 'num_llap_nodes' updated. Min: 1, Max: {0}".format(node_manager_cnt))
|
|
|
+
|
|
|
+ llap_container_size = long(llap_daemon_mem_per_node)
|
|
|
putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size)
|
|
|
Logger.info("LLAP config 'hive.llap.daemon.yarn.container.mb' updated. Current: {0}".format(llap_container_size))
|
|
|
|
|
|
+ # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization.
|
|
|
+ # Else, we don't (1). Override the previous calculated value or (2). User provided value.
|
|
|
+ if self.get_hive_tez_container_size(services) == self.CONFIG_VALUE_UINITIALIZED:
|
|
|
+ mem_per_thread_for_llap = long(mem_per_thread_for_llap)
|
|
|
+ putHiveInteractiveSiteProperty('hive.tez.container.size', mem_per_thread_for_llap)
|
|
|
+ Logger.info("LLAP config 'hive.tez.container.size' updated. Current: {0}".format(mem_per_thread_for_llap))
|
|
|
+
|
|
|
+ putTezInteractiveSiteProperty('tez.runtime.io.sort.mb', tez_runtime_io_sort_mb)
|
|
|
+ if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]:
|
|
|
+ if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY":
|
|
|
+ putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", "maximum", 1800)
|
|
|
+ Logger.info("'Tez for Hive2' config 'tez.runtime.io.sort.mb' updated. Current: {0}".format(tez_runtime_io_sort_mb))
|
|
|
+
|
|
|
+ putTezInteractiveSiteProperty('tez.runtime.unordered.output.buffer.size-mb', tez_runtime_unordered_output_buffer_size)
|
|
|
+ Logger.info("'Tez for Hive2' config 'tez.runtime.unordered.output.buffer.size-mb' updated. Current: {0}".format(tez_runtime_unordered_output_buffer_size))
|
|
|
+
|
|
|
+ putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size)
|
|
|
+ Logger.info("HIVE2 config 'hive.auto.convert.join.noconditionaltask.size' updated. Current: {0}".format(hive_auto_convert_join_noconditionaltask_size))
|
|
|
+
|
|
|
+
|
|
|
num_executors_per_node = long(num_executors_per_node)
|
|
|
putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node)
|
|
|
- Logger.info("LLAP config 'hive.llap.daemon.num.executors' updated. Current: {0}".format(num_executors_per_node))
|
|
|
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1)
|
|
|
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", int(num_executors_per_node_max))
|
|
|
+ Logger.info("LLAP config 'hive.llap.daemon.num.executors' updated. Current: {0}, Min: 1, "
|
|
|
+ "Max: {1}".format(num_executors_per_node, int(num_executors_per_node_max)))
|
|
|
# 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for
|
|
|
# 'hive.llap.daemon.num.executors' at all times.
|
|
|
putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node)
|
|
|
Logger.info("LLAP config 'hive.llap.io.threadpool.size' updated. Current: {0}".format(num_executors_per_node))
|
|
|
|
|
|
- cache_size_per_node = long(cache_size_per_node)
|
|
|
- putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_size_per_node)
|
|
|
- Logger.info("LLAP config 'hive.llap.io.memory.size' updated. Current: {0}".format(cache_size_per_node))
|
|
|
+ cache_mem_per_node = long(cache_mem_per_node)
|
|
|
+ putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_mem_per_node)
|
|
|
+ Logger.info("LLAP config 'hive.llap.io.memory.size' updated. Current: {0}".format(cache_mem_per_node))
|
|
|
llap_io_enabled = 'false'
|
|
|
- if cache_size_per_node >= 64:
|
|
|
+ if cache_mem_per_node >= 64:
|
|
|
llap_io_enabled = 'true'
|
|
|
|
|
|
+ if hive_server_interactive_heapsize != None:
|
|
|
+ putHiveInteractiveEnvProperty("hive_heapsize", int(hive_server_interactive_heapsize))
|
|
|
+ Logger.info("Hive2 config 'hive_heapsize' updated. Current : {0}".format(int(hive_server_interactive_heapsize)))
|
|
|
+
|
|
|
putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled)
|
|
|
Logger.info("Hive2 config 'hive.llap.io.enabled' updated to '{0}' as part of "
|
|
|
"'hive.llap.io.memory.size' calculation.".format(llap_io_enabled))
|
|
@@ -1066,7 +1203,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
|
|
|
putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1)
|
|
|
putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1)
|
|
|
- putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 32)
|
|
|
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1)
|
|
|
|
|
|
putHiveInteractiveEnvProperty('num_llap_nodes', 0)
|
|
|
putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
|
|
@@ -1135,57 +1272,84 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
return node_manager_hosts
|
|
|
|
|
|
"""
|
|
|
- Returns the current LLAP queue capacity percentage value. (llap_queue_capacity)
|
|
|
+ Returns current value of number of LLAP nodes in cluster (num_llap_nodes)
|
|
|
"""
|
|
|
- def get_llap_cap_percent_slider(self, services, configurations):
|
|
|
- llap_slider_cap_percentage = 0
|
|
|
- if 'llap_queue_capacity' in services['configurations']['hive-interactive-env']['properties']:
|
|
|
- llap_slider_cap_percentage = float(
|
|
|
- services['configurations']['hive-interactive-env']['properties']['llap_queue_capacity'])
|
|
|
- Logger.error("'llap_queue_capacity' not present in services['configurations']['hive-interactive-env']['properties'].")
|
|
|
- if llap_slider_cap_percentage <= 0 :
|
|
|
- if 'hive-interactive-env' in configurations and \
|
|
|
- 'llap_queue_capacity' in configurations["hive-interactive-env"]["properties"]:
|
|
|
- llap_slider_cap_percentage = float(configurations["hive-interactive-env"]["properties"]["llap_queue_capacity"])
|
|
|
- assert (llap_slider_cap_percentage > 0), "'llap_queue_capacity' is set to : {0}. Should be > 0.".format(llap_slider_cap_percentage)
|
|
|
- return llap_slider_cap_percentage
|
|
|
+ def get_num_llap_nodes(self, services, configurations):
|
|
|
+ num_llap_nodes = None
|
|
|
+ # Check if 'num_llap_nodes' is modified in current ST invocation.
|
|
|
+ if 'hive-interactive-env' in configurations and 'num_llap_nodes' in configurations['hive-interactive-env']['properties']:
|
|
|
+ num_llap_nodes = float(configurations['hive-interactive-env']['properties']['num_llap_nodes'])
|
|
|
+ Logger.info("'num_llap_nodes' read from configurations as : {0}".format(num_llap_nodes))
|
|
|
+
|
|
|
+ if num_llap_nodes is None:
|
|
|
+ # Check if 'num_llap_nodes' is input in services array.
|
|
|
+ if 'num_llap_nodes' in services['configurations']['hive-interactive-env']['properties']:
|
|
|
+ num_llap_nodes = float(services['configurations']['hive-interactive-env']['properties']['num_llap_nodes'])
|
|
|
+ Logger.info("'num_llap_nodes' read from services as : {0}".format(num_llap_nodes))
|
|
|
+
|
|
|
+ if num_llap_nodes is None:
|
|
|
+ raise Fail("Couldn't retrieve Hive Server 'num_llap_nodes' config.")
|
|
|
+ assert (num_llap_nodes > 0), "'num_llap_nodes' current value : {0}. Expected value : > 0".format(num_llap_nodes)
|
|
|
+
|
|
|
+ return num_llap_nodes
|
|
|
+
|
|
|
|
|
|
+ def get_max_executors_per_node(self, nm_mem_per_node_normalized, nm_cpus_per_node, mem_per_thread):
|
|
|
+ # TODO: This potentially takes up the entire node leaving no space for AMs.
|
|
|
+ return min(math.floor(nm_mem_per_node_normalized / mem_per_thread), nm_cpus_per_node)
|
|
|
|
|
|
"""
|
|
|
- Returns current value of number of LLAP nodes in cluster (num_llap_nodes)
|
|
|
+ Calculates 'mem_per_thread_for_llap' for 1st time initialization. Else returns 'hive.tez.container.size' read value.
|
|
|
"""
|
|
|
- def get_num_llap_nodes(self, services):
|
|
|
- if 'num_llap_nodes' in services['configurations']['hive-interactive-env']['properties']:
|
|
|
- num_llap_nodes = float(
|
|
|
- services['configurations']['hive-interactive-env']['properties']['num_llap_nodes'])
|
|
|
- assert (num_llap_nodes > 0), "Number of LLAP nodes read : {0}. Expected value : > 0".format(
|
|
|
- num_llap_nodes)
|
|
|
- return num_llap_nodes
|
|
|
+ def calculate_mem_per_thread_for_llap(self, services, nm_mem_per_node_normalized, cpu_per_nm_host):
|
|
|
+ hive_tez_container_size = self.get_hive_tez_container_size(services)
|
|
|
+ calculated_hive_tez_container_size = None
|
|
|
+ if hive_tez_container_size == self.CONFIG_VALUE_UINITIALIZED:
|
|
|
+ if nm_mem_per_node_normalized <= 1024:
|
|
|
+ calculated_hive_tez_container_size = min(512, nm_mem_per_node_normalized)
|
|
|
+ elif nm_mem_per_node_normalized <= 4096:
|
|
|
+ calculated_hive_tez_container_size = 1024
|
|
|
+ elif nm_mem_per_node_normalized <= 10240:
|
|
|
+ calculated_hive_tez_container_size = 2048
|
|
|
+ elif nm_mem_per_node_normalized <= 24576:
|
|
|
+ calculated_hive_tez_container_size = 3072
|
|
|
+ else:
|
|
|
+ calculated_hive_tez_container_size = 4096
|
|
|
+ Logger.info("Calculated and returning 'hive_tez_container_size' : {0}".format(calculated_hive_tez_container_size))
|
|
|
+ return float(calculated_hive_tez_container_size)
|
|
|
else:
|
|
|
- raise Fail("Couldn't retrieve Hive Server interactive's 'num_llap_nodes' config.")
|
|
|
+ Logger.info("Returning 'hive_tez_container_size' : {0}".format(hive_tez_container_size))
|
|
|
+ return float(hive_tez_container_size)
|
|
|
|
|
|
"""
|
|
|
- Gets HIVE Tez container size (hive.tez.container.size). Takes into account if it has been calculated as part of current
|
|
|
- Stack Advisor invocation.
|
|
|
+ Read YARN config 'yarn.nodemanager.resource.cpu-vcores'.
|
|
|
"""
|
|
|
- def get_hive_tez_container_size(self, services, configurations):
|
|
|
- hive_container_size = None
|
|
|
- # Check if 'hive.tez.container.size' is modified in current ST invocation.
|
|
|
- if 'hive-site' in configurations and 'hive.tez.container.size' in configurations['hive-site']['properties']:
|
|
|
- hive_container_size = float(configurations['hive-site']['properties']['hive.tez.container.size'])
|
|
|
- Logger.info("'hive.tez.container.size' read from configurations as : {0}".format(hive_container_size))
|
|
|
-
|
|
|
- if not hive_container_size:
|
|
|
- # Check if 'hive.tez.container.size' is input in services array.
|
|
|
- if 'hive.tez.container.size' in services['configurations']['hive-site']['properties']:
|
|
|
- hive_container_size = float(services['configurations']['hive-site']['properties']['hive.tez.container.size'])
|
|
|
- Logger.info("'hive.tez.container.size' read from services as : {0}".format(hive_container_size))
|
|
|
- if not hive_container_size:
|
|
|
- raise Fail("Couldn't retrieve Hive Server 'hive.tez.container.size' config.")
|
|
|
+ def get_cpu_per_nm_host(self, services):
|
|
|
+ cpu_per_nm_host = None
|
|
|
+
|
|
|
+ if 'yarn.nodemanager.resource.cpu-vcores' in services['configurations']['yarn-site']['properties']:
|
|
|
+ cpu_per_nm_host = float(services['configurations']['yarn-site']['properties'][
|
|
|
+ 'yarn.nodemanager.resource.cpu-vcores'])
|
|
|
+ assert (cpu_per_nm_host > 0), "'yarn.nodemanager.resource.cpu-vcores' current value : {0}. Expected value : > 0" \
|
|
|
+ .format(cpu_per_nm_host)
|
|
|
+ else:
|
|
|
+ raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.cpu-vcores' config.")
|
|
|
+ return cpu_per_nm_host
|
|
|
|
|
|
- assert (hive_container_size > 0), "'hive.tez.container.size' current value : {0}. Expected value : > 0".format(
|
|
|
- hive_container_size)
|
|
|
+ """
|
|
|
+ Gets HIVE Tez container size (hive.tez.container.size).
|
|
|
+ """
|
|
|
+ def get_hive_tez_container_size(self, services):
|
|
|
+ hive_container_size = None
|
|
|
+ if 'hive.tez.container.size' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
|
|
|
+ hive_container_size = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.tez.container.size']
|
|
|
+ Logger.info("'hive.tez.container.size' read from services as : {0}".format(hive_container_size))
|
|
|
|
|
|
+ if hive_container_size is None:
|
|
|
+ raise Fail("Couldn't retrieve Hive Server 'hive.tez.container.size' config.")
|
|
|
+ if hive_container_size != self.CONFIG_VALUE_UINITIALIZED:
|
|
|
+ assert (hive_container_size >= 0), "'hive.tez.container.size' current value : {0}. " \
|
|
|
+ "Expected value : >= 0".format(hive_container_size)
|
|
|
return hive_container_size
|
|
|
|
|
|
"""
|
|
@@ -1198,7 +1362,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
hive_container_size = float(configurations['hive-interactive-env']['properties']['llap_headroom_space'])
|
|
|
Logger.info("'llap_headroom_space' read from configurations as : {0}".format(llap_headroom_space))
|
|
|
|
|
|
- if not llap_headroom_space:
|
|
|
+ if llap_headroom_space is None:
|
|
|
# Check if 'llap_headroom_space' is input in services array.
|
|
|
if 'llap_headroom_space' in services['configurations']['hive-interactive-env']['properties']:
|
|
|
llap_headroom_space = float(services['configurations']['hive-interactive-env']['properties']['llap_headroom_space'])
|
|
@@ -1235,7 +1399,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
yarn_min_container_size = float(services['configurations']['yarn-site']['properties']['yarn.scheduler.minimum-allocation-mb'])
|
|
|
Logger.info("'yarn.scheduler.minimum-allocation-mb' read from services as : {0}".format(yarn_min_container_size))
|
|
|
|
|
|
- if not yarn_min_container_size:
|
|
|
+ if yarn_min_container_size is None:
|
|
|
raise Fail("Couldn't retrieve YARN's 'yarn.scheduler.minimum-allocation-mb' config.")
|
|
|
|
|
|
assert (yarn_min_container_size > 0), "'yarn.scheduler.minimum-allocation-mb' current value : {0}. " \
|
|
@@ -1273,14 +1437,14 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
yarn_nm_mem_in_mb = float(configurations['yarn-site']['properties']['yarn.nodemanager.resource.memory-mb'])
|
|
|
Logger.info("'yarn.nodemanager.resource.memory-mb' read from configurations as : {0}".format(yarn_nm_mem_in_mb))
|
|
|
|
|
|
- if not yarn_nm_mem_in_mb:
|
|
|
+ if yarn_nm_mem_in_mb is None:
|
|
|
# Check if 'yarn.nodemanager.resource.memory-mb' is input in services array.
|
|
|
if 'yarn-site' in services['configurations'] and \
|
|
|
'yarn.nodemanager.resource.memory-mb' in services['configurations']['yarn-site']['properties']:
|
|
|
yarn_nm_mem_in_mb = float(services['configurations']['yarn-site']['properties']['yarn.nodemanager.resource.memory-mb'])
|
|
|
Logger.info("'yarn.nodemanager.resource.memory-mb' read from services as : {0}".format(yarn_nm_mem_in_mb))
|
|
|
|
|
|
- if not yarn_nm_mem_in_mb:
|
|
|
+ if yarn_nm_mem_in_mb is None:
|
|
|
raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.memory-mb' config.")
|
|
|
|
|
|
assert (yarn_nm_mem_in_mb > 0.0), "'yarn.nodemanager.resource.memory-mb' current value : {0}. " \
|
|
@@ -1289,21 +1453,45 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
return yarn_nm_mem_in_mb
|
|
|
|
|
|
"""
|
|
|
- Determines Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site based on total cluster capacity.
|
|
|
+ Calculates Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site on initialization if values read is 0.
|
|
|
+ Else returns the read value.
|
|
|
"""
|
|
|
- def calculate_tez_am_container_size(self, total_cluster_capacity):
|
|
|
+ def calculate_tez_am_container_size(self, services, total_cluster_capacity):
|
|
|
if total_cluster_capacity is None or not isinstance(total_cluster_capacity, long):
|
|
|
raise Fail ("Passed-in 'Total Cluster Capacity' is : '{0}'".format(total_cluster_capacity))
|
|
|
+ tez_am_resource_memory_mb = self.get_tez_am_resource_memory_mb(services)
|
|
|
+ calculated_tez_am_resource_memory_mb = None
|
|
|
+ if tez_am_resource_memory_mb == self.CONFIG_VALUE_UINITIALIZED:
|
|
|
+ if total_cluster_capacity <= 0:
|
|
|
+ raise Fail ("Passed-in 'Total Cluster Capacity' ({0}) is Invalid.".format(total_cluster_capacity))
|
|
|
+ if total_cluster_capacity <= 4096:
|
|
|
+ calculated_tez_am_resource_memory_mb = 256
|
|
|
+ elif total_cluster_capacity > 4096 and total_cluster_capacity <= 73728:
|
|
|
+ calculated_tez_am_resource_memory_mb = 512
|
|
|
+ elif total_cluster_capacity > 73728:
|
|
|
+ calculated_tez_am_resource_memory_mb = 1536
|
|
|
+ Logger.info("Calculated and returning 'tez_am_resource_memory_mb' as : {0}".format(calculated_tez_am_resource_memory_mb))
|
|
|
+ return float(calculated_tez_am_resource_memory_mb)
|
|
|
+ else:
|
|
|
+ Logger.info("Returning 'tez_am_resource_memory_mb' as : {0}".format(tez_am_resource_memory_mb))
|
|
|
+ return float(tez_am_resource_memory_mb)
|
|
|
|
|
|
- if total_cluster_capacity <= 0:
|
|
|
- raise Fail ("Passed-in 'Total Cluster Capacity' ({0}) is Invalid.".format(total_cluster_capacity))
|
|
|
- if total_cluster_capacity <= 4096:
|
|
|
- return 256
|
|
|
- elif total_cluster_capacity > 4096 and total_cluster_capacity <= 73728:
|
|
|
- return 512
|
|
|
- elif total_cluster_capacity > 73728:
|
|
|
- return 1536
|
|
|
|
|
|
+ """
|
|
|
+ Gets Tez's AM resource memory (tez.am.resource.memory.mb) from services.
|
|
|
+ """
|
|
|
+ def get_tez_am_resource_memory_mb(self, services):
|
|
|
+ tez_am_resource_memory_mb = None
|
|
|
+ if 'tez.am.resource.memory.mb' in services['configurations']['tez-interactive-site']['properties']:
|
|
|
+ tez_am_resource_memory_mb = services['configurations']['tez-interactive-site']['properties']['tez.am.resource.memory.mb']
|
|
|
+ Logger.info("'tez.am.resource.memory.mb' read from services as : {0}".format(tez_am_resource_memory_mb))
|
|
|
+
|
|
|
+ if tez_am_resource_memory_mb is None:
|
|
|
+ raise Fail("Couldn't retrieve tez's 'tez.am.resource.memory.mb' config.")
|
|
|
+ if tez_am_resource_memory_mb != self.CONFIG_VALUE_UINITIALIZED:
|
|
|
+ assert (tez_am_resource_memory_mb >= 0), "'tez.am.resource.memory.mb' current value : {0}. " \
|
|
|
+ "Expected value : >= 0".format(tez_am_resource_memory_mb)
|
|
|
+ return tez_am_resource_memory_mb
|
|
|
|
|
|
"""
|
|
|
Calculate minimum queue capacity required in order to get LLAP and HIVE2 app into running state.
|
|
@@ -1318,10 +1506,10 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
# Calculate based on minimum size required by containers.
|
|
|
yarn_min_container_size = self.get_yarn_min_container_size(services, configurations)
|
|
|
slider_am_size = self.calculate_slider_am_size(yarn_min_container_size)
|
|
|
- hive_tez_container_size = self.get_hive_tez_container_size(services, configurations)
|
|
|
- tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_cap))
|
|
|
+ hive_tez_container_size = self.get_hive_tez_container_size(services)
|
|
|
+ tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_cap))
|
|
|
normalized_val = self._normalizeUp(slider_am_size, yarn_min_container_size) + self._normalizeUp\
|
|
|
- (hive_tez_container_size, yarn_min_container_size) + self._normalizeUp(tez_am_container_size, yarn_min_container_size)
|
|
|
+ (long(hive_tez_container_size), yarn_min_container_size) + self._normalizeUp(tez_am_container_size, yarn_min_container_size)
|
|
|
|
|
|
min_required = max(total_queue_size_at_20_perc, normalized_val)
|
|
|
|
|
@@ -1354,7 +1542,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
(2). Updates 'llap' queue capacity and state, if current selected queue is 'llap', and only 2 queues exist
|
|
|
at root level : 'default' and 'llap'.
|
|
|
"""
|
|
|
- def checkAndManageLlapQueue(self, services, configurations, hosts, llap_queue_name):
|
|
|
+ def checkAndManageLlapQueue(self, services, configurations, hosts, llap_queue_name, llap_queue_cap_perc):
|
|
|
Logger.info("Determining creation/adjustment of 'capacity-scheduler' for 'llap' queue.")
|
|
|
putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
|
|
|
putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services)
|
|
@@ -1365,24 +1553,6 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
|
|
|
if capacity_scheduler_properties:
|
|
|
leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
|
|
|
- # Get the llap Cluster percentage used for 'llap' Queue creation
|
|
|
- if 'llap_queue_capacity' in services['configurations']['hive-interactive-env']['properties']:
|
|
|
- llap_slider_cap_percentage = int(
|
|
|
- services['configurations']['hive-interactive-env']['properties']['llap_queue_capacity'])
|
|
|
- min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations)
|
|
|
- if min_reqd_queue_cap_perc > 100:
|
|
|
- min_reqd_queue_cap_perc = 100
|
|
|
- Logger.info("Received 'Minimum Required LLAP queue capacity' : {0}% (out of bounds), adjusted it to : 100%".format(min_reqd_queue_cap_perc))
|
|
|
-
|
|
|
- # Adjust 'llap' queue capacity slider value to be minimum required if out of expected bounds.
|
|
|
- if llap_slider_cap_percentage <= 0 or llap_slider_cap_percentage > 100:
|
|
|
- Logger.info("Adjusting HIVE 'llap_queue_capacity' from {0}% (invalid size) to {1}%".format(llap_slider_cap_percentage, min_reqd_queue_cap_perc))
|
|
|
- putHiveInteractiveEnvProperty('llap_queue_capacity', min_reqd_queue_cap_perc)
|
|
|
- llap_slider_cap_percentage = min_reqd_queue_cap_perc
|
|
|
- else:
|
|
|
- Logger.error("Problem retrieving LLAP Queue Capacity. Skipping creating {0} queue".format(llap_queue_name))
|
|
|
- return
|
|
|
-
|
|
|
cap_sched_config_keys = capacity_scheduler_properties.keys()
|
|
|
|
|
|
yarn_default_queue_capacity = -1
|
|
@@ -1420,14 +1590,14 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
if 'default' in leafQueueNames and \
|
|
|
((len(leafQueueNames) == 1 and int(yarn_default_queue_capacity) == 100) or \
|
|
|
((len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames) and \
|
|
|
- ((currLlapQueueState == 'STOPPED' and enabled_hive_int_in_changed_configs) or (currLlapQueueState == 'RUNNING' and currLlapQueueCap != llap_slider_cap_percentage)))):
|
|
|
- adjusted_default_queue_cap = str(100 - llap_slider_cap_percentage)
|
|
|
+ ((currLlapQueueState == 'STOPPED' and enabled_hive_int_in_changed_configs) or (currLlapQueueState == 'RUNNING' and currLlapQueueCap != llap_queue_cap_perc)))):
|
|
|
+ adjusted_default_queue_cap = str(100 - llap_queue_cap_perc)
|
|
|
|
|
|
hive_user = '*' # Open to all
|
|
|
if 'hive_user' in services['configurations']['hive-env']['properties']:
|
|
|
hive_user = services['configurations']['hive-env']['properties']['hive_user']
|
|
|
|
|
|
- llap_slider_cap_percentage = str(llap_slider_cap_percentage)
|
|
|
+ llap_queue_cap_perc = str(llap_queue_cap_perc)
|
|
|
|
|
|
# If capacity-scheduler configs are received as one concatenated string, we deposit the changed configs back as
|
|
|
# one concatenated string.
|
|
@@ -1454,9 +1624,9 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
+ "yarn.scheduler.capacity.root." + llap_queue_name + ".ordering-policy=fifo\n" \
|
|
|
+ "yarn.scheduler.capacity.root." + llap_queue_name + ".minimum-user-limit-percent=100\n" \
|
|
|
+ "yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity=" \
|
|
|
- + llap_slider_cap_percentage + "\n" \
|
|
|
+ + llap_queue_cap_perc + "\n" \
|
|
|
+ "yarn.scheduler.capacity.root." + llap_queue_name + ".capacity=" \
|
|
|
- + llap_slider_cap_percentage + "\n" \
|
|
|
+ + llap_queue_cap_perc + "\n" \
|
|
|
+ "yarn.scheduler.capacity.root." + llap_queue_name + ".acl_submit_applications=" \
|
|
|
+ hive_user + "\n" \
|
|
|
+ "yarn.scheduler.capacity.root." + llap_queue_name + ".acl_administer_queue=" \
|
|
@@ -1485,8 +1655,8 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".state", "RUNNING")
|
|
|
putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".ordering-policy", "fifo")
|
|
|
putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".minimum-user-limit-percent", "100")
|
|
|
- putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity", llap_slider_cap_percentage)
|
|
|
- putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".capacity", llap_slider_cap_percentage)
|
|
|
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity", llap_queue_cap_perc)
|
|
|
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".capacity", llap_queue_cap_perc)
|
|
|
putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_submit_applications", hive_user)
|
|
|
putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_administer_queue", hive_user)
|
|
|
putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-am-resource-percent", "1")
|
|
@@ -1498,19 +1668,16 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
if updated_cap_sched_configs_str or updated_cap_sched_configs_as_dict:
|
|
|
if len(leafQueueNames) == 1: # 'llap' queue didn't exist before
|
|
|
Logger.info("Created YARN Queue : '{0}' with capacity : {1}%. Adjusted 'default' queue capacity to : {2}%" \
|
|
|
- .format(llap_queue_name, llap_slider_cap_percentage, adjusted_default_queue_cap))
|
|
|
+ .format(llap_queue_name, llap_queue_cap_perc, adjusted_default_queue_cap))
|
|
|
else: # Queue existed, only adjustments done.
|
|
|
- Logger.info("Adjusted YARN Queue : '{0}'. Current capacity : {1}%. State: RUNNING.".format(llap_queue_name, llap_slider_cap_percentage))
|
|
|
+ Logger.info("Adjusted YARN Queue : '{0}'. Current capacity : {1}%. State: RUNNING.".format(llap_queue_name, llap_queue_cap_perc))
|
|
|
Logger.info("Adjusted 'default' queue capacity to : {0}%".format(adjusted_default_queue_cap))
|
|
|
|
|
|
# Update Hive 'hive.llap.daemon.queue.name' prop to use 'llap' queue.
|
|
|
putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name)
|
|
|
putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name)
|
|
|
- putHiveInteractiveEnvPropertyAttribute('llap_queue_capacity', "minimum", min_reqd_queue_cap_perc)
|
|
|
- putHiveInteractiveEnvPropertyAttribute('llap_queue_capacity', "maximum", 100)
|
|
|
-
|
|
|
# Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility.
|
|
|
- self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations)
|
|
|
+ self.setLlapDaemonQueuePropAttributes(services, configurations)
|
|
|
else:
|
|
|
Logger.debug("Not creating/adjusting {0} queue. Current YARN queues : {1}".format(llap_queue_name, list(leafQueueNames)))
|
|
|
else:
|
|
@@ -1589,13 +1756,10 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
"""
|
|
|
Checks and sets the 'Hive Server Interactive' 'hive.llap.daemon.queue.name' config Property Attributes. Takes into
|
|
|
account that 'capacity-scheduler' may have changed (got updated) in current Stack Advisor invocation.
|
|
|
-
|
|
|
- Also, updates the 'llap_queue_capacity' slider visibility.
|
|
|
"""
|
|
|
- def setLlapDaemonQueuePropAttributesAndCapSliderVisibility(self, services, configurations):
|
|
|
+ def setLlapDaemonQueuePropAttributes(self, services, configurations):
|
|
|
Logger.info("Determining 'hive.llap.daemon.queue.name' config Property Attributes.")
|
|
|
putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE)
|
|
|
- putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
|
|
|
|
|
|
capacity_scheduler_properties = dict()
|
|
|
|
|
@@ -1645,29 +1809,6 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
leafQueues = sorted(leafQueues, key=lambda q: q['value'])
|
|
|
putHiveInteractiveSitePropertyAttribute("hive.llap.daemon.queue.name", "entries", leafQueues)
|
|
|
Logger.info("'hive.llap.daemon.queue.name' config Property Attributes set to : {0}".format(leafQueues))
|
|
|
-
|
|
|
- # Update 'llap_queue_capacity' slider visibility to 'true' if current selected queue in 'hive.llap.daemon.queue.name'
|
|
|
- # is 'llap', else 'false'.
|
|
|
- llap_daemon_selected_queue_name = None
|
|
|
- llap_queue_selected_in_current_call = None
|
|
|
- if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \
|
|
|
- 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
|
|
|
- llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
|
|
|
-
|
|
|
- if self.HIVE_INTERACTIVE_SITE in configurations and \
|
|
|
- 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']:
|
|
|
- llap_queue_selected_in_current_call = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
|
|
|
-
|
|
|
- # Check to see if only 2 queues exist at root level : 'default' and 'llap' and current selected queue in 'hive.llap.daemon.queue.name'
|
|
|
- # is 'llap'.
|
|
|
- if len(leafQueueNames) == 2 and \
|
|
|
- ((llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == 'llap') or \
|
|
|
- (llap_queue_selected_in_current_call != None and llap_queue_selected_in_current_call == 'llap')):
|
|
|
- putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "true")
|
|
|
- Logger.info("Setting LLAP queue capacity slider visibility to 'True'.")
|
|
|
- else:
|
|
|
- putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false")
|
|
|
- Logger.info("Setting LLAP queue capacity slider visibility to 'False'.")
|
|
|
else:
|
|
|
Logger.error("Problem retrieving YARN queues. Skipping updating HIVE Server Interactve "
|
|
|
"'hive.server2.tez.default.queues' property attributes.")
|
|
@@ -1702,6 +1843,29 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
llap_selected_queue_state = capacity_scheduler_properties.get(llap_selected_queue_state_key)
|
|
|
return llap_selected_queue_state
|
|
|
|
|
|
+ """
|
|
|
+ Retrieves the passed in queue's 'AM fraction' from Capacity Scheduler. Returns default value of 0.1 if AM Percent
|
|
|
+ pertaining to passed-in queue is not present.
|
|
|
+ """
|
|
|
+ def __getQueueAmFractionFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name):
|
|
|
+ # Identify the key which contains the AM fraction for 'llap_daemon_selected_queue_name'.
|
|
|
+ cap_sched_keys = capacity_scheduler_properties.keys()
|
|
|
+ llap_selected_queue_am_percent_key = None
|
|
|
+ for key in cap_sched_keys:
|
|
|
+ if key.endswith("."+llap_daemon_selected_queue_name+".maximum-am-resource-percent"):
|
|
|
+ llap_selected_queue_am_percent_key = key
|
|
|
+ Logger.info("AM percent key got for '{0}' queue is : '{1}'".format(llap_daemon_selected_queue_name, llap_selected_queue_am_percent_key))
|
|
|
+ break;
|
|
|
+ if llap_selected_queue_am_percent_key is None:
|
|
|
+ Logger.info("Returning default AM percent value : '0.1' for queue : {0}".format(llap_daemon_selected_queue_name))
|
|
|
+ return 0.1 # Default value to use if we couldn't retrieve queue's corresponding AM Percent key.
|
|
|
+ else:
|
|
|
+ llap_selected_queue_am_percent = capacity_scheduler_properties.get(llap_selected_queue_am_percent_key)
|
|
|
+ Logger.info("Returning read value for key '{0}' as : '{1}' for queue : '{2}'".format(llap_selected_queue_am_percent_key,
|
|
|
+ llap_selected_queue_am_percent,
|
|
|
+ llap_daemon_selected_queue_name))
|
|
|
+ return llap_selected_queue_am_percent
|
|
|
+
|
|
|
"""
|
|
|
Calculates the total available capacity for the passed-in YARN queue of any level based on the percentages.
|
|
|
"""
|