Browse Source

AMBARI-15525: Stack Featurize Spark service (jluniya)

Jayush Luniya 9 years ago
parent
commit
7afb17719b

+ 3 - 0
ambari-common/src/main/python/resource_management/libraries/functions/constants.py

@@ -45,4 +45,7 @@ class StackFeature:
   CONFIG_VERSIONING = "config_versioning"
   RANGER = "ranger"
   NFS = "nfs"
+  TEZ_FOR_SPARK = "tez_for_spark"
+  SPARK_16PLUS = "spark_16plus"
+  SPARK_THRIFTSERVER = "spark_thriftserver"
     

+ 58 - 21
ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py

@@ -22,20 +22,57 @@ limitations under the License.
 import ambari_simplejson as json
 from resource_management.libraries.script import Script
 from resource_management.libraries.functions.default import default
-  
+from resource_management.libraries.functions.version import compare_versions
+
 _DEFAULT_STACK_FEATURES = {
   "stack_features": [
-    { "name": "snappy", "description" : "Snappy compressor/decompressor support" , "min_version" : "2.0.0.0" , "max_version" : "2.2.0.0" }
-    ,
-    { "name": "express_upgrade", "description" : "Express upgrade support", "min_version" : "2.1.0.0" }
-    ,
-    { "name": "rolling_upgrade", "description" : "Rolling upgrade support", "min_version" : "2.2.0.0" }
-    ,
-    { "name": "config_versioning", "description" : "Configurable versions support", "min_version" : "2.3.0.0" }
-    ,
-    { "name": "ranger", "description" : "Ranger Service support", "min_version" : "2.2.0.0" }
-    ,
-    { "name": "nfs", "description" : "NFS support", "min_version" : "2.3.0.0" }
+    {
+      "name": "snappy",
+      "description": "Snappy compressor/decompressor support",
+      "min_version": "2.0.0.0",
+      "max_version": "2.2.0.0"
+    },
+    {
+      "name": "express_upgrade",
+      "description": "Express upgrade support",
+      "min_version": "2.1.0.0"
+    },
+    {
+      "name": "rolling_upgrade",
+      "description": "Rolling upgrade support",
+      "min_version": "2.2.0.0"
+    },
+    {
+      "name": "config_versioning",
+      "description": "Configurable versions support",
+      "min_version": "2.3.0.0"
+    },
+    {
+      "name": "ranger",
+      "description": "Ranger Service support",
+      "min_version": "2.2.0.0"
+    },
+    {
+      "name": "nfs",
+      "description": "NFS support",
+      "min_version": "2.3.0.0"
+    },
+    {
+      "name": "tez_for_spark",
+      "description": "Tez dependency for Spark",
+      "min_version": "2.2.0.0",
+      "max_version": "2.3.0.0"
+    },
+    {
+      "name": "spark_16plus",
+      "description": "Spark 1.6+",
+      "min_version": "2.4.0.0"
+    },
+    {
+      "name": "spark_thriftserver",
+      "description": "Spark Thrift Server",
+      "min_version": "2.3.2.0"
+    }
   ]
 }
 
@@ -54,14 +91,14 @@ def check_stack_feature(stack_feature, stack_version):
   
   for feature in data["stack_features"]:
     if feature["name"] == stack_feature:
-        if "min_version" in feature:
-            min_version = feature["min_version"] 
-            if Script.is_stack_less_than(min_version):
-                return False
-        if "max_version" in feature:
-            max_version = feature["max_version"]
-            if Script.is_stack_greater_or_equal(max_version):
-                return False
-        return True 
+      if "min_version" in feature:
+        min_version = feature["min_version"]
+        if compare_versions(stack_version, min_version, format = True) < 0:
+          return False
+      if "max_version" in feature:
+        max_version = feature["max_version"]
+        if compare_versions(stack_version, max_version, format = True) >= 0:
+          return False
+      return True
         
   return False    

+ 6 - 5
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/job_history_server.py

@@ -24,9 +24,10 @@ import os
 from resource_management.libraries.script.script import Script
 from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import stack_select
-from resource_management.libraries.functions.version import compare_versions, format_stack_version
 from resource_management.libraries.functions.copy_tarball import copy_to_hdfs
 from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
 from resource_management.core.logger import Logger
 from resource_management.core import shell
 from setup_spark import *
@@ -68,21 +69,21 @@ class JobHistoryServer(Script):
     
 
   def get_stack_to_component(self):
-     return {"HDP": "spark-historyserver"}
+    import params
+    return {params.stack_name : "spark-historyserver"}
 
   def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)
-    if params.version and compare_versions(format_stack_version(params.version), '2.2.0.0') >= 0:
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
       Logger.info("Executing Spark Job History Server Stack Upgrade pre-restart")
       conf_select.select(params.stack_name, "spark", params.version)
       stack_select.select("spark-historyserver", params.version)
 
       # Spark 1.3.1.2.3, and higher, which was included in HDP 2.3, does not have a dependency on Tez, so it does not
       # need to copy the tarball, otherwise, copy it.
-
-      if params.version and compare_versions(format_stack_version(params.version), '2.3.0.0') < 0:
+      if params.version and check_stack_feature(StackFeature.TEZ_FOR_SPARK, params.version):
         resource_created = copy_to_hdfs(
           "tez",
           params.user_group,

+ 6 - 11
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/params.py

@@ -20,7 +20,8 @@ limitations under the License.
 
 
 import status_params
-
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
 from setup_spark import *
 
 import resource_management.libraries.functions
@@ -49,6 +50,7 @@ config = Script.get_config()
 tmp_dir = Script.get_tmp_dir()
 
 stack_name = default("/hostLevelParams/stack_name", None)
+stack_root = Script.get_stack_root()
 stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
 stack_version_formatted = format_stack_version(stack_version_unformatted)
 host_sys_prepped = default("/hostLevelParams/host_sys_prepped", False)
@@ -56,23 +58,16 @@ host_sys_prepped = default("/hostLevelParams/host_sys_prepped", False)
 # New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade
 version = default("/commandParams/version", None)
 
-# TODO! FIXME! Version check is not working as of today :
-#   $ yum list installed | grep <stack-selector-tool>
-#   <stack-selector-tool>.noarch                            2.2.1.0-2340.el6           @HDP-2.2
-# And stack_version_formatted returned from hostLevelParams/stack_version is : 2.2.0.0
-# Commenting out for time being
-#stack_is_hdp22_or_further = stack_version_formatted != "" and compare_versions(stack_version_formatted, '2.2.1.0') >= 0
-
 spark_conf = '/etc/spark/conf'
 hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
 hadoop_bin_dir = stack_select.get_hadoop_dir("bin")
 
-if Script.is_stack_greater_or_equal("2.2"):
+if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
   hadoop_home = stack_select.get_hadoop_dir("home")
-  spark_conf = format("/usr/hdp/current/{component_directory}/conf")
+  spark_conf = format("{stack_root}/current/{component_directory}/conf")
   spark_log_dir = config['configurations']['spark-env']['spark_log_dir']
   spark_pid_dir = status_params.spark_pid_dir
-  spark_home = format("/usr/hdp/current/{component_directory}")
+  spark_home = format("{stack_root}/current/{component_directory}")
 
 spark_thrift_server_conf_file = spark_conf + "/spark-thrift-sparkconf.conf"
 java_home = config['hostLevelParams']['java_home']

+ 3 - 2
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/setup_spark.py

@@ -26,8 +26,9 @@ from resource_management import *
 from resource_management.core.exceptions import ComponentIsNotRunning
 from resource_management.core.logger import Logger
 from resource_management.core import shell
-from resource_management.libraries.functions.version import compare_versions
 from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
 
 def setup_spark(env, type, upgrade_type = None, action = None):
   import params
@@ -103,7 +104,7 @@ def setup_spark(env, type, upgrade_type = None, action = None):
   if effective_version:
     effective_version = format_stack_version(effective_version)
 
-  if params.spark_thrift_fairscheduler_content and effective_version and compare_versions(effective_version, '2.4.0.0') >= 0:
+  if params.spark_thrift_fairscheduler_content and effective_version and check_stack_feature(StackFeature.SPARK_16PLUS, effective_version):
     # create spark-thrift-fairscheduler.xml
     File(os.path.join(params.spark_conf,"spark-thrift-fairscheduler.xml"),
       owner=params.spark_user,

+ 6 - 4
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/spark_client.py

@@ -22,8 +22,9 @@ import sys
 from resource_management import *
 from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import stack_select
-from resource_management.libraries.functions.version import compare_versions, format_stack_version
-from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from resource_management.core.exceptions import ClientComponentHasNoStatus
 from resource_management.core.logger import Logger
 from resource_management.core import shell
 from setup_spark import setup_spark
@@ -44,13 +45,14 @@ class SparkClient(Script):
     raise ClientComponentHasNoStatus()
   
   def get_stack_to_component(self):
-    return {"HDP": "spark-client"}
+    import params
+    return {params.stack_name : "spark-client"}
 
   def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)
-    if params.version and compare_versions(format_stack_version(params.version), '2.2.0.0') >= 0:
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
       Logger.info("Executing Spark Client Stack Upgrade pre-restart")
       conf_select.select(params.stack_name, "spark", params.version)
       stack_select.select("spark-client", params.version)

+ 4 - 3
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/spark_service.py

@@ -21,11 +21,12 @@ import socket
 
 from resource_management.libraries.script.script import Script
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
-from resource_management.libraries.functions.version import compare_versions
 from resource_management.libraries.functions.copy_tarball import copy_to_hdfs
 from resource_management.libraries.functions import format
 from resource_management.core.resources.system import File, Execute
 from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
 
 def spark_service(name, upgrade_type=None, action=None):
   import params
@@ -36,7 +37,7 @@ def spark_service(name, upgrade_type=None, action=None):
     if effective_version:
       effective_version = format_stack_version(effective_version)
 
-    if effective_version and compare_versions(effective_version, '2.4.0.0') >= 0:
+    if effective_version and check_stack_feature(StackFeature.SPARK_16PLUS, effective_version):
       # copy spark-hdp-assembly.jar to hdfs
       copy_to_hdfs("spark", params.user_group, params.hdfs_user, host_sys_prepped=params.host_sys_prepped)
       # create spark history directory
@@ -56,7 +57,7 @@ def spark_service(name, upgrade_type=None, action=None):
 
     # Spark 1.3.1.2.3, and higher, which was included in HDP 2.3, does not have a dependency on Tez, so it does not
     # need to copy the tarball, otherwise, copy it.
-    if params.stack_version_formatted and compare_versions(params.stack_version_formatted, '2.3.0.0') < 0:
+    if params.stack_version_formatted and check_stack_feature(StackFeature.TEZ_FOR_SPARK, params.stack_version_formatted):
       resource_created = copy_to_hdfs("tez", params.user_group, params.hdfs_user, host_sys_prepped=params.host_sys_prepped)
       if resource_created:
         params.HdfsResource(None, action="execute")

+ 5 - 4
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/spark_thrift_server.py

@@ -24,8 +24,8 @@ import os
 from resource_management.libraries.script.script import Script
 from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import stack_select
-from resource_management.libraries.functions.version import compare_versions, format_stack_version
-from resource_management.libraries.functions.copy_tarball import copy_to_hdfs
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
 from resource_management.libraries.functions.check_process_status import check_process_status
 from resource_management.core.logger import Logger
 from resource_management.core import shell
@@ -64,13 +64,14 @@ class SparkThriftServer(Script):
     check_process_status(status_params.spark_thrift_server_pid_file)
 
   def get_stack_to_component(self):
-     return {"HDP": "spark-thriftserver"}
+    import params
+    return { params.stack_name : "spark-thriftserver"}
 
   def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)
-    if params.version and compare_versions(format_stack_version(params.version), '2.3.2.0') >= 0:
+    if params.version and check_stack_feature(StackFeature.SPARK_THRIFTSERVER, params.version):
       Logger.info("Executing Spark Thrift Server Stack Upgrade pre-restart")
       conf_select.select(params.stack_name, "spark", params.version)
       stack_select.select("spark-thriftserver", params.version)

+ 2 - 1
ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_client.py

@@ -56,7 +56,8 @@ class ZookeeperClient(Script):
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class ZookeeperClientLinux(ZookeeperClient):
   def get_stack_to_component(self):
-    return {"HDP": "zookeeper-client"}
+    import params
+    return {params.stack_name : "zookeeper-client"}
 
   def install(self, env):
     self.install_packages(env)

+ 2 - 1
ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py

@@ -65,7 +65,8 @@ class ZookeeperServer(Script):
 class ZookeeperServerLinux(ZookeeperServer):
 
   def get_stack_to_component(self):
-    return {"HDP": "zookeeper-server"}
+    import params
+    return {params.stack_name : "zookeeper-server"}
 
   def install(self, env):
     self.install_packages(env)

+ 47 - 11
ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json

@@ -1,15 +1,51 @@
 {
   "stack_features": [
-    { "name": "snappy", "description" : "Snappy compressor/decompressor support" , "min_version" : "2.0.0.0" , "max_version" : "2.2.0.0" }
-    ,
-    { "name": "express_upgrade", "description" : "Express upgrade support", "min_version" : "2.1.0.0" }
-    ,
-    { "name": "rolling_upgrade", "description" : "Rolling upgrade support", "min_version" : "2.2.0.0" }
-    ,
-    { "name": "config_versioning", "description" : "Configurable versions support", "min_version" : "2.3.0.0" }
-    ,
-    { "name": "ranger", "description" : "Ranger Service support", "min_version" : "2.2.0.0" }
-    ,
-    { "name": "nfs", "description" : "NFS support", "min_version" : "2.3.0.0" }
+    {
+      "name": "snappy",
+      "description": "Snappy compressor/decompressor support",
+      "min_version": "2.0.0.0",
+      "max_version": "2.2.0.0"
+    },
+    {
+      "name": "express_upgrade",
+      "description": "Express upgrade support",
+      "min_version": "2.1.0.0"
+    },
+    {
+      "name": "rolling_upgrade",
+      "description": "Rolling upgrade support",
+      "min_version": "2.2.0.0"
+    },
+    {
+      "name": "config_versioning",
+      "description": "Configurable versions support",
+      "min_version": "2.3.0.0"
+    },
+    {
+      "name": "ranger",
+      "description": "Ranger Service support",
+      "min_version": "2.2.0.0"
+    },
+    {
+      "name": "nfs",
+      "description": "NFS support",
+      "min_version": "2.3.0.0"
+    },
+    {
+      "name": "tez_for_spark",
+      "description": "Tez dependency for Spark",
+      "min_version": "2.2.0.0",
+      "max_version": "2.3.0.0"
+    },
+    {
+      "name": "spark_16plus",
+      "description": "Spark 1.6+",
+      "min_version": "2.4.0.0"
+    },
+    {
+      "name": "spark_thriftserver",
+      "description": "Spark Thrift Server",
+      "min_version": "2.3.2.0"
+    }
   ]
 }