Parcourir la source

AMBARI-18946 Ambari Integration for Zeppelin and Spark 2.0 (r-kamath)

Renjith Kamath il y a 9 ans
Parent
commit
182479d3ce

+ 10 - 5
ambari-server/src/main/resources/common-services/ZEPPELIN/0.6.0.2.5/package/scripts/master.py

@@ -58,8 +58,12 @@ class Master(Script):
     # update the configs specified by user
     self.configure(env)
 
-    Execute('echo spark_version:' + params.spark_version + ' detected for spark_home: '
-            + params.spark_home + ' >> ' + params.zeppelin_log_file, user=params.zeppelin_user)
+    if params.spark_version:
+      Execute('echo spark_version:' + str(params.spark_version) + ' detected for spark_home: '
+              + params.spark_home + ' >> ' + params.zeppelin_log_file, user=params.zeppelin_user)
+    if params.spark2_version:
+      Execute('echo spark2_version:' + str(params.spark2_version) + ' detected for spark2_home: '
+              + params.spark2_home + ' >> ' + params.zeppelin_log_file, user=params.zeppelin_user)
 
   def create_zeppelin_dir(self, params):
     params.HdfsResource(format("/user/{zeppelin_user}"),
@@ -138,9 +142,10 @@ class Master(Script):
     File(format("{params.conf_dir}/log4j.properties"), content=params.log4j_properties_content,
          owner=params.zeppelin_user, group=params.zeppelin_group)
 
-    # copy hive-site.xml
-    File(format("{params.conf_dir}/hive-site.xml"), content=StaticFile("/etc/spark/conf/hive-site.xml"),
-         owner=params.zeppelin_user, group=params.zeppelin_group)
+    # copy hive-site.xml only if Spark 1.x is installed
+    if 'spark-defaults' in params.config['configurations']:
+        File(format("{params.conf_dir}/hive-site.xml"), content=StaticFile("/etc/spark/conf/hive-site.xml"),
+             owner=params.zeppelin_user, group=params.zeppelin_group)
 
     if len(params.hbase_master_hosts) > 0:
       # copy hbase-site.xml

+ 21 - 7
ambari-server/src/main/resources/common-services/ZEPPELIN/0.6.0.2.5/package/scripts/params.py

@@ -39,6 +39,14 @@ def get_port_from_url(address):
   else:
     return address
 
+def extract_spark_version(spark_home):
+  try:
+    with open(spark_home + "/RELEASE") as fline:
+      return re.search('Spark (\d\.\d).+', fline.readline().rstrip()).group(1)
+  except:
+    pass
+  return None
+
 
 # server configurations
 config = Script.get_config()
@@ -58,13 +66,17 @@ spark_jar_dir = config['configurations']['zeppelin-env']['zeppelin.spark.jar.dir
 spark_jar = format("{spark_jar_dir}/zeppelin-spark-0.5.5-SNAPSHOT.jar")
 setup_view = True
 temp_file = config['configurations']['zeppelin-env']['zeppelin.temp.file']
-spark_home = os.path.join(stack_root, "current", "spark-client")
 
-try:
-  fline = open(spark_home + "/RELEASE").readline().rstrip()
-  spark_version = re.search('Spark (\d\.\d).+', fline).group(1)
-except:
-  pass
+spark_home = None
+spark_version = None
+spark2_home = None
+spark2_version = None
+if 'spark-defaults' in config['configurations']:
+  spark_home = os.path.join(stack_root, "current", 'spark-client')
+  spark_version = extract_spark_version(spark_home)
+if 'spark2-defaults' in config['configurations']:
+  spark2_home = os.path.join(stack_root, "current", 'spark2-client')
+  spark2_version = extract_spark_version(spark2_home)
 
 # New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade
 version = default("/commandParams/version", None)
@@ -126,8 +138,10 @@ if 'hbase_master_hosts' in master_configs and 'hbase-site' in config['configurat
   hbase_zookeeper_quorum = config['configurations']['hbase-site']['hbase.zookeeper.quorum']
 
 # detect spark queue
-if 'spark.yarn.queue' in config['configurations']['spark-defaults']:
+if 'spark-defaults' in config['configurations'] and 'spark.yarn.queue' in config['configurations']['spark-defaults']:
   spark_queue = config['configurations']['spark-defaults']['spark.yarn.queue']
+elif 'spark2-defaults' in config['configurations'] and 'spark.yarn.queue' in config['configurations']['spark2-defaults']:
+  spark_queue = config['configurations']['spark2-defaults']['spark.yarn.queue']
 else:
   spark_queue = 'default'