Prechádzať zdrojové kódy

AMBARI-14616. Add yarn-site properties for Spark Shuffle Aux services (including RU/EU and Fresh install) (dgrinenko via dlysnichenko)

Lisnichenko Dmitro 9 rokov pred
rodič
commit
ceea27dbbd

+ 103 - 0
ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/SparkShufflePropertyConfig.java

@@ -0,0 +1,103 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction.upgrades;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.ServiceNotFoundException;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.serveraction.AbstractServerAction;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.commons.lang.StringUtils;
+
+import com.google.inject.Inject;
+
+/**
+ * Computes Yarn properties for SPARK.
+ *
+ * Properties list:
+ * - yarn.nodemanager.aux-services.spark_shuffle.class
+ * - yarn.nodemanager.aux-services  (add spark_shuffle to the list)
+ *
+ * These properties available starting from HDP-2.4 stack.
+ */
+public class SparkShufflePropertyConfig extends AbstractServerAction {
+  private static final String YARN_SITE_CONFIG_TYPE = "yarn-site";
+
+  private static final String YARN_NODEMANAGER_AUX_SERVICES = "yarn.nodemanager.aux-services";
+  private static final String SPARK_SHUFFLE_AUX_STR = "spark_shuffle";
+  private static final String YARN_NODEMANAGER_AUX_SERVICES_SPARK_SHUFFLE_CLASS = "yarn.nodemanager.aux-services.spark_shuffle.class";
+  private static final String YARN_NODEMANAGER_AUX_SERVICES_SPARK_SHUFFLE_CLASS_VALUE = "org.apache.spark.network.yarn.YarnShuffleService";
+
+  @Inject
+  private Clusters clusters;
+
+  @Override
+  public CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext)
+      throws AmbariException, InterruptedException {
+
+    String clusterName = getExecutionCommand().getClusterName();
+    Cluster cluster = clusters.getCluster(clusterName);
+    Config yarnSiteConfig = cluster.getDesiredConfigByType(YARN_SITE_CONFIG_TYPE);
+
+    if (yarnSiteConfig == null) {
+      return  createCommandReport(0, HostRoleStatus.FAILED,"{}",
+              String.format("Source type %s not found", YARN_SITE_CONFIG_TYPE), "");
+    }
+
+    try {
+      cluster.getService("SPARK");  // check if SPARK service present
+      Map<String, String> yarnSiteProperties = yarnSiteConfig.getProperties();
+
+      final List<String> auxSevices;
+      final String oldAuxServices = yarnSiteProperties.get(YARN_NODEMANAGER_AUX_SERVICES);
+      final String newAuxServices;
+
+      if (yarnSiteProperties.containsKey(YARN_NODEMANAGER_AUX_SERVICES)) {
+        auxSevices = Arrays.asList(oldAuxServices.split(",", -1));
+      } else {
+        auxSevices = new ArrayList<>();
+      }
+      auxSevices.add(SPARK_SHUFFLE_AUX_STR);
+      newAuxServices = StringUtils.join(auxSevices, ",");
+
+      yarnSiteProperties.put(YARN_NODEMANAGER_AUX_SERVICES, newAuxServices);
+      yarnSiteProperties.put(YARN_NODEMANAGER_AUX_SERVICES_SPARK_SHUFFLE_CLASS, YARN_NODEMANAGER_AUX_SERVICES_SPARK_SHUFFLE_CLASS_VALUE);
+      yarnSiteConfig.setProperties(yarnSiteProperties);
+      yarnSiteConfig.persist(false);
+
+      return createCommandReport(0, HostRoleStatus.COMPLETED, "{}",
+        String.format("%s was set from %s to %s. %s was set to %s",
+                YARN_NODEMANAGER_AUX_SERVICES, oldAuxServices, newAuxServices,
+                YARN_NODEMANAGER_AUX_SERVICES_SPARK_SHUFFLE_CLASS_VALUE, YARN_NODEMANAGER_AUX_SERVICES_SPARK_SHUFFLE_CLASS_VALUE), "");
+
+    } catch (ServiceNotFoundException e) {
+      return createCommandReport(0, HostRoleStatus.COMPLETED, "{}",
+          String.format("%s not updated as no SPARK service present on the cluster", YARN_NODEMANAGER_AUX_SERVICES), "");
+    }
+  }
+}

+ 4 - 0
ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.4.xml

@@ -244,6 +244,10 @@
       <skippable>true</skippable>  <!-- May fix configuration problems manually -->
       <skippable>true</skippable>  <!-- May fix configuration problems manually -->
       <supports-auto-skip-failure>false</supports-auto-skip-failure>
       <supports-auto-skip-failure>false</supports-auto-skip-failure>
 
 
+      <execute-stage service="YARN" component="RESOURCEMANAGER" title="Calculating Yarn Properties for Spark">
+        <task xsi:type="server_action" summary="Calculating Yarn Properties for Spark" class="org.apache.ambari.server.serveraction.upgrades.SparkShufflePropertyConfig" />
+      </execute-stage>
+
       <execute-stage service="TEZ" component="TEZ_CLIENT" title="Apply config changes for Tez">
       <execute-stage service="TEZ" component="TEZ_CLIENT" title="Apply config changes for Tez">
         <task xsi:type="configure" id="hdp_2_4_0_0_tez_client_adjust_tez_lib_uris_property"/>
         <task xsi:type="configure" id="hdp_2_4_0_0_tez_client_adjust_tez_lib_uris_property"/>
       </execute-stage>
       </execute-stage>

+ 3 - 0
ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml

@@ -486,6 +486,9 @@
       </component>
       </component>
 
 
       <component name="RESOURCEMANAGER">
       <component name="RESOURCEMANAGER">
+        <pre-upgrade>
+          <task xsi:type="server_action" summary="Calculating Yarn Properties for Spark" class="org.apache.ambari.server.serveraction.upgrades.SparkShufflePropertyConfig" />
+        </pre-upgrade>
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
 
         <upgrade>
         <upgrade>

+ 77 - 1
ambari-server/src/main/resources/stacks/HDP/2.4/services/stack_advisor.py

@@ -17,5 +17,81 @@ See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 """
 """
 
 
+
 class HDP24StackAdvisor(HDP23StackAdvisor):
 class HDP24StackAdvisor(HDP23StackAdvisor):
-  pass
+  def getServiceConfigurationRecommenderDict(self):
+    parent_recommend_conf_dict = super(HDP24StackAdvisor, self).getServiceConfigurationRecommenderDict()
+    child_recommend_conf_dict = {
+      "YARN": self.recommendYARNConfigurations
+    }
+    parent_recommend_conf_dict.update(child_recommend_conf_dict)
+    return parent_recommend_conf_dict
+
+  def getServiceConfigurationValidators(self):
+    parentValidators = super(HDP24StackAdvisor, self).getServiceConfigurationValidators()
+    childValidators = {
+      "YARN": {"yarn-site": self.validateYARNConfigurations}
+    }
+    self.mergeValidators(parentValidators, childValidators)
+    return parentValidators
+
+  def recommendYARNConfigurations(self, configurations, clusterData, services, hosts):
+    super(HDP24StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts)
+
+    yarn_site_config = "yarn-site"
+    properties = services["configurations"] if yarn_site_config in services["configurations"] else []
+    yarn_site_properties = properties[yarn_site_config]["properties"] if yarn_site_config in properties and \
+                                                                         "properties" in properties[yarn_site_config] else []
+    put_yarn_site_property = self.putProperty(configurations, yarn_site_config, services)
+    put_yarn_site_property_attributes = self.putPropertyAttribute(configurations, yarn_site_config)
+    services_list = [service["StackServices"]["service_name"] for service in services["services"]]
+
+    if 'SPARK' in services_list:
+      if "yarn.nodemanager.aux-services" in yarn_site_properties:
+        aux_services = yarn_site_properties["yarn.nodemanager.aux-services"].split(",")
+        aux_services.append("spark_shuffle")
+        put_yarn_site_property("yarn.nodemanager.aux-services", ",".join(aux_services))
+      else:
+        put_yarn_site_property("yarn.nodemanager.aux-services", "spark_shuffle")
+
+      put_yarn_site_property("yarn.nodemanager.aux-services.spark_shuffle.class",
+                             "org.apache.spark.network.yarn.YarnShuffleService")
+    else:
+      put_yarn_site_property_attributes("yarn.nodemanager.aux-services.spark_shuffle.class", "delete", "true")
+
+  def validateYARNConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
+    yarn_site = properties
+    validationItems = []
+    services_list = [service["StackServices"]["service_name"] for service in services["services"]]
+
+    if "SPARK" in services_list and 'YARN' in services_list:
+      # yarn.nodemanager.aux-services = ...,spark_shuffle,....
+      # yarn.nodemanager.aux-services.spark_shuffle.class = <not set>
+      if "yarn.nodemanager.aux-services" in yarn_site \
+        and "spark_shuffle" in yarn_site["yarn.nodemanager.aux-services"].lower() \
+        and "yarn.nodemanager.aux-services.spark_shuffle.class" not in yarn_site:
+        validationItems.append({
+          "config-name": "yarn.nodemanager.aux-services.spark_shuffle.class",
+          "item": self.getErrorItem("If spark_shuffle is listed in the aux-services, property value for " +
+                                    "yarn.nodemanager.aux-services.spark_shuffle.class need to be set")
+        })
+
+      # yarn.nodemanager.aux-services = <not set>
+      # yarn.nodemanager.aux-services.spark_shuffle.class = is set
+      spark_aux_service_warning = False
+      if "yarn.nodemanager.aux-services" in yarn_site and "spark_shuffle" not in yarn_site[
+        "yarn.nodemanager.aux-services"].lower() \
+        and "yarn.nodemanager.aux-services.spark_shuffle.class" in yarn_site:
+        spark_aux_service_warning = True
+
+      if "yarn.nodemanager.aux-services" not in yarn_site and "yarn.nodemanager.aux-services.spark_shuffle.class" in yarn_site:
+        spark_aux_service_warning = True
+
+      if spark_aux_service_warning:
+        validationItems.append({
+          "config-name": "yarn.nodemanager.aux-services",
+          "item": self.getWarnItem("If yarn.nodemanager.aux-services.spark_shuffle.class is set, probably " +
+                                   "aux-services property need to be updated to enable spark_shuffle")
+        })
+
+    return self.toConfigurationValidationProblems(validationItems, "yarn-site")