浏览代码

AMBARI-10461. Spark History Server does not start with non-root agent (aonishuk)

Andrew Onishuk 10 年之前
父节点
当前提交
85acd9fa92
共有 16 个文件被更改,包括 541 次插入334 次删除
  1. 1 0
      ambari-common/src/main/python/resource_management/libraries/functions/__init__.py
  2. 2 2
      ambari-common/src/main/python/resource_management/libraries/providers/properties_file.py
  3. 1 0
      ambari-common/src/main/python/resource_management/libraries/resources/properties_file.py
  4. 25 46
      ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/job_history_server.py
  5. 2 16
      ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/params.py
  6. 4 28
      ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py
  7. 12 133
      ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/setup_spark.py
  8. 10 11
      ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/spark_client.py
  9. 46 0
      ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/spark_service.py
  10. 6 10
      ambari-server/src/test/python/stacks/2.2/KNOX/test_knox_gateway.py
  11. 10 11
      ambari-server/src/test/python/stacks/2.2/SLIDER/test_slider_client.py
  12. 180 75
      ambari-server/src/test/python/stacks/2.2/SPARK/test_job_history_server.py
  13. 120 0
      ambari-server/src/test/python/stacks/2.2/SPARK/test_spark_client.py
  14. 58 0
      ambari-server/src/test/python/stacks/2.2/SPARK/test_spark_service_check.py
  15. 32 1
      ambari-server/src/test/python/stacks/2.2/configs/default.json
  16. 32 1
      ambari-server/src/test/python/stacks/2.2/configs/secured.json

+ 1 - 0
ambari-common/src/main/python/resource_management/libraries/functions/__init__.py

@@ -40,6 +40,7 @@ from resource_management.libraries.functions.format_jvm_option import *
 from resource_management.libraries.functions.constants import *
 from resource_management.libraries.functions.get_hdp_version import *
 from resource_management.libraries.functions.get_lzo_packages import *
+from resource_management.libraries.functions.dynamic_variable_interpretation import *
 
 IS_WINDOWS = platform.system() == "Windows"
 

+ 2 - 2
ambari-common/src/main/python/resource_management/libraries/providers/properties_file.py

@@ -41,8 +41,8 @@ class PropertiesFileProvider(Provider):
 
     config_content = InlineTemplate('''# Generated by Apache Ambari. {{time.asctime(time.localtime())}}
     {% for key, value in properties_dict|dictsort %}
-{{key}}={{value}}{% endfor %}
-    ''', extra_imports=[time], properties_dict=self.resource.properties)
+{{key}}{{key_value_delimiter}}{{value}}{% endfor %}
+    ''', extra_imports=[time], properties_dict=self.resource.properties, key_value_delimiter=self.resource.key_value_delimiter)
 
     Logger.info(format("Generating properties file: {filepath}"))
 

+ 1 - 0
ambari-common/src/main/python/resource_management/libraries/resources/properties_file.py

@@ -33,5 +33,6 @@ class PropertiesFile(Resource):
   mode = ResourceArgument()
   owner = ResourceArgument()
   group = ResourceArgument()
+  key_value_delimiter = ResourceArgument(default="=")
 
   actions = Resource.actions + ["create"]

+ 25 - 46
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/job_history_server.py

@@ -29,74 +29,53 @@ from resource_management.core.exceptions import ComponentIsNotRunning
 from resource_management.core.logger import Logger
 from resource_management.core import shell
 from setup_spark import *
+from spark_service import spark_service
 
 
 class JobHistoryServer(Script):
 
-  def get_stack_to_component(self):
-     return {"HDP": "spark-historyserver"}
-
-  def pre_rolling_restart(self, env):
+  def install(self, env):
     import params
-
     env.set_params(params)
-    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
-      Execute(format("hdp-select set spark-historyserver {version}"))
-      copy_tarballs_to_hdfs('tez', 'spark-historyserver', params.spark_user, params.hdfs_user, params.user_group)
-
-  def install(self, env):
+    
     self.install_packages(env)
+    
+  def configure(self, env):
     import params
     env.set_params(params)
-
-  def stop(self, env, rolling_restart=False):
+    
+    setup_spark(env, 'server', action = 'config')
+    
+  def start(self, env, rolling_restart=False):
     import params
-
     env.set_params(params)
-    daemon_cmd = format('{spark_history_server_stop}')
-    Execute(daemon_cmd,
-            user=params.spark_user,
-            environment={'JAVA_HOME': params.java_home}
-    )
-    if os.path.isfile(params.spark_history_server_pid_file):
-      os.remove(params.spark_history_server_pid_file)
-
+    
+    self.configure(env)
+    spark_service(action='start')
 
-  def start(self, env, rolling_restart=False):
+  def stop(self, env, rolling_restart=False):
     import params
-
     env.set_params(params)
-    setup_spark(env, 'server', action='start')
-
-    if params.security_enabled:
-      spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ")
-      Execute(spark_kinit_cmd, user=params.spark_user)
-
-    copy_tarballs_to_hdfs('tez', 'spark-historyserver', params.spark_user, params.hdfs_user, params.user_group)
-
-    daemon_cmd = format('{spark_history_server_start}')
-    no_op_test = format(
-      'ls {spark_history_server_pid_file} >/dev/null 2>&1 && ps -p `cat {spark_history_server_pid_file}` >/dev/null 2>&1')
-    Execute(daemon_cmd,
-            user=params.spark_user,
-            environment={'JAVA_HOME': params.java_home},
-            not_if=no_op_test
-    )
+    
+    spark_service(action='stop')
 
   def status(self, env):
     import status_params
-
     env.set_params(status_params)
-    pid_file = format("{spark_history_server_pid_file}")
-    # Recursively check all existing gmetad pid files
-    check_process_status(pid_file)
 
-  # Note: This function is not called from start()/install()
-  def configure(self, env):
+    check_process_status(status_params.spark_history_server_pid_file)
+    
+
+  def get_stack_to_component(self):
+     return {"HDP": "spark-historyserver"}
+
+  def pre_rolling_restart(self, env):
     import params
 
     env.set_params(params)
-    setup_spark(env, 'server', action = 'config')
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
+      Execute(format("hdp-select set spark-historyserver {version}"))
+      copy_tarballs_to_hdfs('tez', 'spark-historyserver', params.spark_user, params.hdfs_user, params.user_group)
 
 if __name__ == "__main__":
   JobHistoryServer().execute()

+ 2 - 16
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/params.py

@@ -93,21 +93,7 @@ else:
 spark_hive_sec_authorization_enabled = "false"
 spark_yarn_historyServer_address = default(spark_history_server_host, "localhost")
 
-spark_yarn_applicationMaster_waitTries = default(
-  "/configurations/spark-defaults/spark.yarn.applicationMaster.waitTries", '10')
-spark_yarn_submit_file_replication = default("/configurations/spark-defaults/spark.yarn.submit.file.replication", '3')
-spark_yarn_preserve_staging_files = default("/configurations/spark-defaults/spark.yarn.preserve.staging.files", "false")
-spark_yarn_scheduler_heartbeat_interval = default(
-  "/configurations/spark-defaults/spark.yarn.scheduler.heartbeat.interval-ms", "5000")
-spark_yarn_queue = default("/configurations/spark-defaults/spark.yarn.queue", "default")
-spark_yarn_containerLauncherMaxThreads = default(
-  "/configurations/spark-defaults/spark.yarn.containerLauncherMaxThreads", "25")
-spark_yarn_max_executor_failures = default("/configurations/spark-defaults/spark.yarn.max.executor.failures", "3")
-spark_yarn_executor_memoryOverhead = default("/configurations/spark-defaults/spark.yarn.executor.memoryOverhead", "384")
-spark_yarn_driver_memoryOverhead = default("/configurations/spark-defaults/spark.yarn.driver.memoryOverhead", "384")
-spark_history_provider = default("/configurations/spark-defaults/spark.history.provider",
-                                 "org.apache.spark.deploy.yarn.history.YarnHistoryProvider")
-spark_history_ui_port = default("/configurations/spark-defaults/spark.history.ui.port", "18080")
+spark_history_ui_port = config['configurations']['spark-defaults']['spark.history.ui.port']
 
 spark_env_sh = config['configurations']['spark-env']['content']
 spark_log4j_properties = config['configurations']['spark-log4j-properties']['content']
@@ -117,7 +103,7 @@ spark_javaopts_properties = config['configurations']['spark-javaopts-properties'
 hive_server_host = default("/clusterHostInfo/hive_server_host", [])
 is_hive_installed = not len(hive_server_host) == 0
 
-hdp_full_version = get_hdp_version()
+hdp_full_version = functions.get_hdp_version('spark-client')
 
 spark_driver_extraJavaOptions = str(config['configurations']['spark-defaults']['spark.driver.extraJavaOptions'])
 if spark_driver_extraJavaOptions.find('-Dhdp.version') == -1:

+ 4 - 28
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py

@@ -27,40 +27,16 @@ from resource_management.core.logger import Logger
 class SparkServiceCheck(Script):
   def service_check(self, env):
     import params
-
     env.set_params(params)
 
     if params.security_enabled:
       spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ")
       Execute(spark_kinit_cmd, user=params.spark_user)
 
-    command = "curl"
-    httpGssnegotiate = "--negotiate"
-    userpswd = "-u:"
-    insecure = "-k"
-    silent = "-s"
-    out = "-o /dev/null"
-    head = "-w'%{http_code}'"
-    url = 'http://' + params.spark_history_server_host + ':' + str(params.spark_history_ui_port)
-
-    command_with_flags = [command, silent, out, head, httpGssnegotiate, userpswd, insecure, url]
-
-    is_running = False
-    for i in range(1,11):
-      proc = subprocess.Popen(command_with_flags, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-      Logger.info("Try %d, command: %s" % (i, " ".join(command_with_flags)))
-      (stdout, stderr) = proc.communicate()
-      response = stdout
-      if '200' in response:
-        is_running = True
-        Logger.info('Spark Job History Server up and running')
-        break
-      Logger.info("Response: %s" % str(response))
-      time.sleep(5)
-
-    if is_running == False :
-      Logger.info('Spark Job History Server not running.')
-      raise ComponentIsNotRunning()
+    Execute(format("curl -s -o /dev/null -w'%{{http_code}}' --negotiate -u: -k http://{spark_history_server_host}:{spark_history_ui_port} | grep 200"),
+      tries = 10,
+      try_sleep=3,
+    )
 
 if __name__ == "__main__":
   SparkServiceCheck().execute()

+ 12 - 133
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/setup_spark.py

@@ -31,25 +31,22 @@ from resource_management.core import shell
 def setup_spark(env, type, action = None):
   import params
 
-  env.set_params(params)
-
   Directory([params.spark_pid_dir, params.spark_log_dir],
             owner=params.spark_user,
             group=params.user_group,
             recursive=True
   )
-  if type == 'server':
-    if action == 'start' or action == 'config':
-      params.HdfsDirectory(params.spark_hdfs_user_dir,
-                         action="create",
-                         owner=params.spark_user,
-                         mode=0775
-      )
-
-  file_path = params.spark_conf + '/spark-defaults.conf'
-  create_file(file_path)
-
-  write_properties_to_file(file_path, spark_properties(params))
+  if type == 'server' and action == 'config':
+    params.HdfsDirectory(params.spark_hdfs_user_dir,
+                       action="create",
+                       owner=params.spark_user,
+                       mode=0775
+    )
+    
+  PropertiesFile(format("{spark_conf}/spark-defaults.conf"),
+    properties = params.config['configurations']['spark-defaults'],
+    key_value_delimiter = " ",               
+  )
 
   # create spark-env.sh in etc/conf dir
   File(os.path.join(params.spark_conf, 'spark-env.sh'),
@@ -79,127 +76,9 @@ def setup_spark(env, type, action = None):
   )
 
   if params.is_hive_installed:
-    hive_config = get_hive_config()
     XmlConfig("hive-site.xml",
               conf_dir=params.spark_conf,
-              configurations=hive_config,
+              configurations=params.config['configurations']['hive-site'],
               owner=params.spark_user,
               group=params.spark_group,
               mode=0644)
-
-def get_hive_config():
-  import params
-  hive_conf_dict = dict()
-  hive_conf_dict['hive.metastore.uris'] = params.config['configurations']['hive-site']['hive.metastore.uris']
-  if params.security_enabled:
-    hive_conf_dict['hive.metastore.sasl.enabled'] =  str(params.config['configurations']['hive-site']['hive.metastore.sasl.enabled']).lower()
-    hive_conf_dict['hive.metastore.kerberos.keytab.file'] = params.config['configurations']['hive-site']['hive.metastore.kerberos.keytab.file']
-    hive_conf_dict['hive.server2.authentication.spnego.principal'] =  params.config['configurations']['hive-site']['hive.server2.authentication.spnego.principal']
-    hive_conf_dict['hive.server2.authentication.spnego.keytab'] = params.config['configurations']['hive-site']['hive.server2.authentication.spnego.keytab']
-    hive_conf_dict['hive.metastore.kerberos.principal'] = params.config['configurations']['hive-site']['hive.metastore.kerberos.principal']
-    hive_conf_dict['hive.server2.authentication.kerberos.principal'] = params.config['configurations']['hive-site']['hive.server2.authentication.kerberos.principal']
-    hive_conf_dict['hive.server2.authentication.kerberos.keytab'] =  params.config['configurations']['hive-site']['hive.server2.authentication.kerberos.keytab']
-    hive_conf_dict['hive.security.authorization.enabled'] = params.spark_hive_sec_authorization_enabled
-    hive_conf_dict['hive.server2.enable.doAs'] =  str(params.config['configurations']['hive-site']['hive.server2.enable.doAs']).lower()
-
-  return hive_conf_dict
-
-
-def spark_properties(params):
-  spark_dict = dict()
-
-  all_spark_config  = params.config['configurations']['spark-defaults']
-  #Add all configs unfiltered first to handle Custom case.
-  spark_dict = all_spark_config.copy()
-
-  spark_dict['spark.yarn.executor.memoryOverhead'] = params.spark_yarn_executor_memoryOverhead
-  spark_dict['spark.yarn.driver.memoryOverhead'] = params.spark_yarn_driver_memoryOverhead
-  spark_dict['spark.yarn.applicationMaster.waitTries'] = params.spark_yarn_applicationMaster_waitTries
-  spark_dict['spark.yarn.scheduler.heartbeat.interval-ms'] = params.spark_yarn_scheduler_heartbeat_interval
-  spark_dict['spark.yarn.max_executor.failures'] = params.spark_yarn_max_executor_failures
-  spark_dict['spark.yarn.queue'] = params.spark_yarn_queue
-  spark_dict['spark.yarn.containerLauncherMaxThreads'] = params.spark_yarn_containerLauncherMaxThreads
-  spark_dict['spark.yarn.submit.file.replication'] = params.spark_yarn_submit_file_replication
-  spark_dict['spark.yarn.preserve.staging.files'] = params.spark_yarn_preserve_staging_files
-
-  # Hardcoded paramaters to be added to spark-defaults.conf
-  spark_dict['spark.yarn.historyServer.address'] = params.spark_history_server_host + ':' + str(
-    params.spark_history_ui_port)
-  spark_dict['spark.yarn.services'] = 'org.apache.spark.deploy.yarn.history.YarnHistoryService'
-  spark_dict['spark.history.provider'] = 'org.apache.spark.deploy.yarn.history.YarnHistoryProvider'
-  spark_dict['spark.history.ui.port'] = params.spark_history_ui_port
-
-  spark_dict['spark.driver.extraJavaOptions'] = params.spark_driver_extraJavaOptions
-  spark_dict['spark.yarn.am.extraJavaOptions'] = params.spark_yarn_am_extraJavaOptions
-
-
-  return spark_dict
-
-
-def write_properties_to_file(file_path, value):
-  for key in value:
-    modify_config(file_path, key, value[key])
-
-
-def modify_config(filepath, variable, setting):
-  var_found = False
-  already_set = False
-  V = str(variable)
-  S = str(setting)
-
-  if ' ' in S:
-    S = '%s' % S
-
-  for line in fileinput.input(filepath, inplace=1):
-    if not line.lstrip(' ').startswith('#') and '=' in line:
-      _infile_var = str(line.split('=')[0].rstrip(' '))
-      _infile_set = str(line.split('=')[1].lstrip(' ').rstrip())
-      if var_found == False and _infile_var.rstrip(' ') == V:
-        var_found = True
-        if _infile_set.lstrip(' ') == S:
-          already_set = True
-        else:
-          line = "%s %s\n" % (V, S)
-
-    sys.stdout.write(line)
-
-  if not var_found:
-    with open(filepath, "a") as f:
-      f.write("%s \t %s\n" % (V, S))
-  elif already_set == True:
-    pass
-  else:
-    pass
-
-  return
-
-
-def create_file(file_path):
-  try:
-    file = open(file_path, 'w')
-    file.close()
-  except:
-    print('Unable to create file: ' + file_path)
-    sys.exit(0)
-
-
-def get_hdp_version():
-  try:
-    command = 'hdp-select status hadoop-client'
-    return_code, hdp_output = shell.call(command, timeout=20)
-  except Exception, e:
-    Logger.error(str(e))
-    raise Fail('Unable to execute hdp-select command to retrieve the version.')
-
-  if return_code != 0:
-    raise Fail(
-      'Unable to determine the current version because of a non-zero return code of {0}'.format(str(return_code)))
-
-  hdp_version = re.sub('hadoop-client - ', '', hdp_output)
-  hdp_version = hdp_version.rstrip()
-  match = re.match('[0-9]+.[0-9]+.[0-9]+.[0-9]+-[0-9]+', hdp_version)
-
-  if match is None:
-    raise Fail('Failed to get extracted version')
-
-  return hdp_version

+ 10 - 11
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/spark_client.py

@@ -28,29 +28,28 @@ from setup_spark import setup_spark
 
 
 class SparkClient(Script):
-  def get_stack_to_component(self):
-    return {"HDP": "spark-client"}
-
-  def pre_rolling_restart(self, env):
-    import params
-
-    env.set_params(params)
-    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
-      Execute(format("hdp-select set spark-client {version}"))
-
   def install(self, env):
     self.install_packages(env)
     self.configure(env)
 
   def configure(self, env):
     import params
-
     env.set_params(params)
+    
     setup_spark(env, 'client', action = 'config')
 
   def status(self, env):
     raise ClientComponentHasNoStatus()
+  
+  def get_stack_to_component(self):
+    return {"HDP": "spark-client"}
 
+  def pre_rolling_restart(self, env):
+    import params
+
+    env.set_params(params)
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
+      Execute(format("hdp-select set spark-client {version}"))
 
 if __name__ == "__main__":
   SparkClient().execute()

+ 46 - 0
ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/spark_service.py

@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+from resource_management import *
+
+def spark_service(action):
+  import params
+  
+  if action == 'start':
+    if params.security_enabled:
+      spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ")
+      Execute(spark_kinit_cmd, user=params.spark_user)
+
+    copy_tarballs_to_hdfs('tez', 'spark-historyserver', params.spark_user, params.hdfs_user, params.user_group)
+
+    no_op_test = format(
+      'ls {spark_history_server_pid_file} >/dev/null 2>&1 && ps -p `cat {spark_history_server_pid_file}` >/dev/null 2>&1')
+    Execute(format('{spark_history_server_start}'),
+            user=params.spark_user,
+            environment={'JAVA_HOME': params.java_home},
+            not_if=no_op_test
+    )
+  elif action == 'stop':
+    Execute(format('{spark_history_server_stop}'),
+            user=params.spark_user,
+            environment={'JAVA_HOME': params.java_home}
+    )
+    File(params.spark_history_server_pid_file,
+         action="delete"
+    )

文件差异内容过多而无法显示
+ 6 - 10
ambari-server/src/test/python/stacks/2.2/KNOX/test_knox_gateway.py


+ 10 - 11
ambari-server/src/test/python/stacks/2.2/SLIDER/test_slider_client.py

@@ -79,12 +79,11 @@ class TestSliderClient(RMFTestCase):
                        target = RMFTestCase.TARGET_COMMON_SERVICES
     )
 
-    self.assertResourceCalled('Execute',
-                              '/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa@EXAMPLE.COM; /usr/lib/slider/bin/slider list',
-                              logoutput=True,
-                              tries=3,
-                              user='ambari-qa',
-                              try_sleep=5,
+    self.assertResourceCalled('Execute', '/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa@EXAMPLE.COM; /usr/hdp/current/slider-client/bin/slider list',
+        logoutput = True,
+        tries = 3,
+        user = 'ambari-qa',
+        try_sleep = 5,
     )
     self.assertNoMoreResources()
 
@@ -98,11 +97,11 @@ class TestSliderClient(RMFTestCase):
                        target = RMFTestCase.TARGET_COMMON_SERVICES
     )
 
-    self.assertResourceCalled('Execute', ' /usr/lib/slider/bin/slider list',
-                              logoutput=True,
-                              tries=3,
-                              user='ambari-qa',
-                              try_sleep=5,
+    self.assertResourceCalled('Execute', ' /usr/hdp/current/slider-client/bin/slider list',
+        logoutput = True,
+        tries = 3,
+        user = 'ambari-qa',
+        try_sleep = 5,
     )
     self.assertNoMoreResources()
 

+ 180 - 75
ambari-server/src/test/python/stacks/2.2/SPARK/test_job_history_server.py

@@ -17,90 +17,195 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
-import sys
-import os
 from mock.mock import MagicMock, patch
-
 from stacks.utils.RMFTestCase import *
-from resource_management.core import shell
-from resource_management.libraries.functions import dynamic_variable_interpretation
 
+@patch("resource_management.libraries.functions.get_hdp_version", new=MagicMock(return_value="2.3.0.0-1597"))
 class TestJobHistoryServer(RMFTestCase):
   COMMON_SERVICES_PACKAGE_DIR = "SPARK/1.2.0.2.2/package"
   STACK_VERSION = "2.2"
 
-  def setUp(self):
-    sys.path.insert(0, os.path.join(os.getcwd(),
-      "../../main/resources/common-services", self.COMMON_SERVICES_PACKAGE_DIR,
-      "scripts"))
-
-  @patch.object(shell, "call")
-  @patch("setup_spark.create_file")
-  @patch("setup_spark.write_properties_to_file")
-  @patch.object(dynamic_variable_interpretation, "copy_tarballs_to_hdfs")
-  def test_start(self, copy_tarball_mock, write_properties_to_file_mock, create_file_mock, call_mock):
-    hdp_version = "2.2.2.0-2538"
-    call_mock.return_value = (0, hdp_version)
-    copy_tarball_mock.return_value = 0
-
+  def test_configure_default(self):
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/job_history_server.py",
-                         classname="JobHistoryServer",
-                         command="start",
-                         config_file="spark-job-history-server.json",
-                         hdp_stack_version=self.STACK_VERSION,
-                         target=RMFTestCase.TARGET_COMMON_SERVICES
+                   classname = "JobHistoryServer",
+                   command = "configure",
+                   config_file="default.json",
+                   hdp_stack_version = self.STACK_VERSION,
+                   target = RMFTestCase.TARGET_COMMON_SERVICES
     )
-
-    self.assertTrue(create_file_mock.called)
-    self.assertTrue(write_properties_to_file_mock.called)
-
-
-    self.assertResourceCalled("Directory", "/var/run/spark",
-                              owner="spark",
-                              group="hadoop",
-                              recursive=True
-    )
-    self.assertResourceCalled("Directory", "/var/log/spark",
-                              owner="spark",
-                              group="hadoop",
-                              recursive=True
-    )
-    self.assertResourceCalled("HdfsDirectory", "/user/spark",
-                              security_enabled=False,
-                              keytab=UnknownConfigurationMock(),
-                              conf_dir="/etc/hadoop/conf",
-                              hdfs_user="hdfs",
-                              kinit_path_local="/usr/bin/kinit",
-                              mode=509,
-                              owner="spark",
-                              bin_dir="/usr/hdp/current/hadoop-client/bin",
-                              action=["create"]
-    )
-    self.assertResourceCalled("File", "/etc/spark/conf/spark-env.sh",
-                              owner="spark",
-                              group="spark",
-                              content=InlineTemplate(self.getConfig()['configurations']['spark-env']['content'])
-    )
-    self.assertResourceCalled("File", "/etc/spark/conf/log4j.properties",
-                              owner="spark",
-                              group="spark",
-                              content=self.getConfig()['configurations']['spark-log4j-properties']['content']
-    )
-    self.assertResourceCalled("File", "/etc/spark/conf/metrics.properties",
-                              owner="spark",
-                              group="spark",
-                              content=InlineTemplate(self.getConfig()['configurations']['spark-metrics-properties']['content'])
-    )
-    self.assertResourceCalled("File", "/etc/spark/conf/java-opts",
-                              owner="spark",
-                              group="spark",
-                              content="  -Dhdp.version=" + hdp_version
+    self.assert_configure_default()
+    self.assertNoMoreResources()
+    
+  def test_start_default(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/job_history_server.py",
+                   classname = "JobHistoryServer",
+                   command = "start",
+                   config_file="default.json",
+                   hdp_stack_version = self.STACK_VERSION,
+                   target = RMFTestCase.TARGET_COMMON_SERVICES
     )
-
-    copy_tarball_mock.assert_called_with("tez", "spark-historyserver", "spark", "hdfs", "hadoop")
+    self.assert_configure_default()
+    self.assertResourceCalled('Execute', '/usr/hdp/current/spark-client/sbin/start-history-server.sh',
+        environment = {'JAVA_HOME': u'/usr/jdk64/jdk1.7.0_45'},
+        not_if = 'ls /var/run/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1.pid >/dev/null 2>&1 && ps -p `cat /var/run/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1.pid` >/dev/null 2>&1',
+        user = 'spark',
+    )
+    self.assertNoMoreResources()
+    
+  def test_stop_default(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/job_history_server.py",
+                   classname = "JobHistoryServer",
+                   command = "stop",
+                   config_file="default.json",
+                   hdp_stack_version = self.STACK_VERSION,
+                   target = RMFTestCase.TARGET_COMMON_SERVICES
+    )
+    self.assertResourceCalled('Execute', '/usr/hdp/current/spark-client/sbin/stop-history-server.sh',
+        environment = {'JAVA_HOME': u'/usr/jdk64/jdk1.7.0_45'},
+        user = 'spark',
+    )
+    self.assertResourceCalled('File', '/var/run/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1.pid',
+        action = ['delete'],
+    )
+    self.assertNoMoreResources()
     
-    self.assertResourceCalled("Execute", "/usr/hdp/current/spark-historyserver/sbin/start-history-server.sh",
-                              not_if="ls /var/run/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1.pid >/dev/null 2>&1 && ps -p `cat /var/run/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1.pid` >/dev/null 2>&1",
-                              environment={'JAVA_HOME': '/usr/jdk64/jdk1.7.0_67'},
-                              user="spark"
+  def test_configure_secured(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/job_history_server.py",
+                   classname = "JobHistoryServer",
+                   command = "configure",
+                   config_file="secured.json",
+                   hdp_stack_version = self.STACK_VERSION,
+                   target = RMFTestCase.TARGET_COMMON_SERVICES
+    )
+    self.assert_configure_secured()
+    self.assertNoMoreResources()
+    
+  def test_start_secured(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/job_history_server.py",
+                   classname = "JobHistoryServer",
+                   command = "start",
+                   config_file="secured.json",
+                   hdp_stack_version = self.STACK_VERSION,
+                   target = RMFTestCase.TARGET_COMMON_SERVICES
+    )
+    self.assert_configure_secured()
+    self.assertResourceCalled('Execute', '/usr/bin/kinit -kt /etc/security/keytabs/spark.service.keytab spark/localhost@EXAMPLE.COM; ',
+        user = 'spark',
+    )
+    self.assertResourceCalled('Execute', '/usr/hdp/current/spark-client/sbin/start-history-server.sh',
+        environment = {'JAVA_HOME': u'/usr/jdk64/jdk1.7.0_45'},
+        not_if = 'ls /var/run/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1.pid >/dev/null 2>&1 && ps -p `cat /var/run/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1.pid` >/dev/null 2>&1',
+        user = 'spark',
+    )
+    self.assertNoMoreResources()
+    
+  def test_stop_secured(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/job_history_server.py",
+                   classname = "JobHistoryServer",
+                   command = "stop",
+                   config_file="secured.json",
+                   hdp_stack_version = self.STACK_VERSION,
+                   target = RMFTestCase.TARGET_COMMON_SERVICES
+    )
+    self.assertResourceCalled('Execute', '/usr/hdp/current/spark-client/sbin/stop-history-server.sh',
+        environment = {'JAVA_HOME': u'/usr/jdk64/jdk1.7.0_45'},
+        user = 'spark',
+    )
+    self.assertResourceCalled('File', '/var/run/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1.pid',
+        action = ['delete'],
+    )
+    self.assertNoMoreResources()
+
+  def assert_configure_default(self):
+    self.assertResourceCalled('Directory', '/var/run/spark',
+        owner = 'spark',
+        group = 'hadoop',
+        recursive = True,
+    )
+    self.assertResourceCalled('Directory', '/var/log/spark',
+        owner = 'spark',
+        group = 'hadoop',
+        recursive = True,
+    )
+    self.assertResourceCalled('HdfsDirectory', '/user/spark',
+        security_enabled = False,
+        keytab = UnknownConfigurationMock(),
+        conf_dir = '/etc/hadoop/conf',
+        hdfs_user = 'hdfs',
+        kinit_path_local = '/usr/bin/kinit',
+        mode = 0775,
+        owner = 'spark',
+        bin_dir = '/usr/hdp/current/hadoop-client/bin',
+        action = ['create'],
+    )
+    self.assertResourceCalled('PropertiesFile', '/etc/spark/conf/spark-defaults.conf',
+        key_value_delimiter = ' ',
+        properties = self.getConfig()['configurations']['spark-defaults'],
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/spark-env.sh',
+        content = InlineTemplate(self.getConfig()['configurations']['spark-env']['content']),
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/log4j.properties',
+        content = '\n# Set everything to be logged to the console\nlog4j.rootCategory=INFO, console\nlog4j.appender.console=org.apache.log4j.ConsoleAppender\nlog4j.appender.console.target=System.err\nlog4j.appender.console.layout=org.apache.log4j.PatternLayout\nlog4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n\n\n# Settings to quiet third party logs that are too verbose\nlog4j.logger.org.eclipse.jetty=WARN\nlog4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR\nlog4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO\nlog4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO',
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/metrics.properties',
+        content = InlineTemplate(self.getConfig()['configurations']['spark-metrics-properties']['content']),
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/java-opts',
+        content = '  -Dhdp.version=2.3.0.0-1597',
+        owner = 'spark',
+        group = 'spark',
+    )
+      
+  def assert_configure_secured(self):
+    self.assertResourceCalled('Directory', '/var/run/spark',
+        owner = 'spark',
+        group = 'hadoop',
+        recursive = True,
+    )
+    self.assertResourceCalled('Directory', '/var/log/spark',
+        owner = 'spark',
+        group = 'hadoop',
+        recursive = True,
+    )
+    self.assertResourceCalled('HdfsDirectory', '/user/spark',
+        security_enabled = True,
+        keytab = UnknownConfigurationMock(),
+        conf_dir = '/etc/hadoop/conf',
+        hdfs_user = UnknownConfigurationMock(),
+        kinit_path_local = '/usr/bin/kinit',
+        mode = 0775,
+        owner = 'spark',
+        bin_dir = '/usr/hdp/current/hadoop-client/bin',
+        action = ['create'],
+    )
+    self.assertResourceCalled('PropertiesFile', '/etc/spark/conf/spark-defaults.conf',
+        key_value_delimiter = ' ',
+        properties = self.getConfig()['configurations']['spark-defaults'],
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/spark-env.sh',
+        content = InlineTemplate(self.getConfig()['configurations']['spark-env']['content']),
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/log4j.properties',
+        content = '\n# Set everything to be logged to the console\nlog4j.rootCategory=INFO, console\nlog4j.appender.console=org.apache.log4j.ConsoleAppender\nlog4j.appender.console.target=System.err\nlog4j.appender.console.layout=org.apache.log4j.PatternLayout\nlog4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n\n\n# Settings to quiet third party logs that are too verbose\nlog4j.logger.org.eclipse.jetty=WARN\nlog4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR\nlog4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO\nlog4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO',
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/metrics.properties',
+        content = InlineTemplate(self.getConfig()['configurations']['spark-metrics-properties']['content']),
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/java-opts',
+        content = '  -Dhdp.version=2.3.0.0-1597',
+        owner = 'spark',
+        group = 'spark',
     )

+ 120 - 0
ambari-server/src/test/python/stacks/2.2/SPARK/test_spark_client.py

@@ -0,0 +1,120 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+from mock.mock import MagicMock, patch
+from stacks.utils.RMFTestCase import *
+
+@patch("resource_management.libraries.functions.get_hdp_version", new=MagicMock(return_value="2.3.0.0-1597"))
+class TestSparkClient(RMFTestCase):
+  COMMON_SERVICES_PACKAGE_DIR = "SPARK/1.2.0.2.2/package"
+  STACK_VERSION = "2.2"
+
+  def test_configure_default(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/spark_client.py",
+                   classname = "SparkClient",
+                   command = "configure",
+                   config_file="default.json",
+                   hdp_stack_version = self.STACK_VERSION,
+                   target = RMFTestCase.TARGET_COMMON_SERVICES
+    )
+    self.assert_configure_default()
+    self.assertNoMoreResources()
+    
+  def test_configure_secured(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/spark_client.py",
+                   classname = "SparkClient",
+                   command = "configure",
+                   config_file="secured.json",
+                   hdp_stack_version = self.STACK_VERSION,
+                   target = RMFTestCase.TARGET_COMMON_SERVICES
+    )
+    self.assert_configure_secured()
+    self.assertNoMoreResources()
+
+  def assert_configure_default(self):
+    self.assertResourceCalled('Directory', '/var/run/spark',
+        owner = 'spark',
+        group = 'hadoop',
+        recursive = True,
+    )
+    self.assertResourceCalled('Directory', '/var/log/spark',
+        owner = 'spark',
+        group = 'hadoop',
+        recursive = True,
+    )
+    self.assertResourceCalled('PropertiesFile', '/etc/spark/conf/spark-defaults.conf',
+        key_value_delimiter = ' ',
+        properties = self.getConfig()['configurations']['spark-defaults'],
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/spark-env.sh',
+        content = InlineTemplate(self.getConfig()['configurations']['spark-env']['content']),
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/log4j.properties',
+        content = '\n# Set everything to be logged to the console\nlog4j.rootCategory=INFO, console\nlog4j.appender.console=org.apache.log4j.ConsoleAppender\nlog4j.appender.console.target=System.err\nlog4j.appender.console.layout=org.apache.log4j.PatternLayout\nlog4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n\n\n# Settings to quiet third party logs that are too verbose\nlog4j.logger.org.eclipse.jetty=WARN\nlog4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR\nlog4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO\nlog4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO',
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/metrics.properties',
+        content = InlineTemplate(self.getConfig()['configurations']['spark-metrics-properties']['content']),
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/java-opts',
+        content = '  -Dhdp.version=2.3.0.0-1597',
+        owner = 'spark',
+        group = 'spark',
+    )
+      
+  def assert_configure_secured(self):
+    self.assertResourceCalled('Directory', '/var/run/spark',
+        owner = 'spark',
+        group = 'hadoop',
+        recursive = True,
+    )
+    self.assertResourceCalled('Directory', '/var/log/spark',
+        owner = 'spark',
+        group = 'hadoop',
+        recursive = True,
+    )
+    self.assertResourceCalled('PropertiesFile', '/etc/spark/conf/spark-defaults.conf',
+        key_value_delimiter = ' ',
+        properties = self.getConfig()['configurations']['spark-defaults'],
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/spark-env.sh',
+        content = InlineTemplate(self.getConfig()['configurations']['spark-env']['content']),
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/log4j.properties',
+        content = '\n# Set everything to be logged to the console\nlog4j.rootCategory=INFO, console\nlog4j.appender.console=org.apache.log4j.ConsoleAppender\nlog4j.appender.console.target=System.err\nlog4j.appender.console.layout=org.apache.log4j.PatternLayout\nlog4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n\n\n# Settings to quiet third party logs that are too verbose\nlog4j.logger.org.eclipse.jetty=WARN\nlog4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR\nlog4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO\nlog4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO',
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/metrics.properties',
+        content = InlineTemplate(self.getConfig()['configurations']['spark-metrics-properties']['content']),
+        owner = 'spark',
+        group = 'spark',
+    )
+    self.assertResourceCalled('File', '/etc/spark/conf/java-opts',
+        content = '  -Dhdp.version=2.3.0.0-1597',
+        owner = 'spark',
+        group = 'spark',
+    )

+ 58 - 0
ambari-server/src/test/python/stacks/2.2/SPARK/test_spark_service_check.py

@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+from mock.mock import MagicMock, call, patch
+from stacks.utils.RMFTestCase import *
+
+@patch("resource_management.libraries.functions.get_hdp_version", new=MagicMock(return_value="2.3.0.0-1597"))
+class TestServiceCheck(RMFTestCase):
+  COMMON_SERVICES_PACKAGE_DIR = "SPARK/1.2.0.2.2/package"
+  STACK_VERSION = "2.2"
+
+  def test_service_check_default(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/service_check.py",
+                        classname="SparkServiceCheck",
+                        command="service_check",
+                        config_file="default.json",
+                        hdp_stack_version = self.STACK_VERSION,
+                        target = RMFTestCase.TARGET_COMMON_SERVICES
+    )
+    self.assertResourceCalled('Execute', "curl -s -o /dev/null -w'%{http_code}' --negotiate -u: -k http://localhost:18080 | grep 200",
+        tries = 10,
+        try_sleep = 3,
+    )
+    self.assertNoMoreResources()
+    
+    
+  def test_service_check_secured(self):
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/service_check.py",
+                        classname="SparkServiceCheck",
+                        command="service_check",
+                        config_file="secured.json",
+                        hdp_stack_version = self.STACK_VERSION,
+                        target = RMFTestCase.TARGET_COMMON_SERVICES
+    )
+    self.assertResourceCalled('Execute', '/usr/bin/kinit -kt /etc/security/keytabs/spark.service.keytab spark/localhost@EXAMPLE.COM; ',
+        user = 'spark',
+    )
+    self.assertResourceCalled('Execute', "curl -s -o /dev/null -w'%{http_code}' --negotiate -u: -k http://localhost:18080 | grep 200",
+        tries = 10,
+        try_sleep = 3,
+    )
+    self.assertNoMoreResources()

文件差异内容过多而无法显示
+ 32 - 1
ambari-server/src/test/python/stacks/2.2/configs/default.json


文件差异内容过多而无法显示
+ 32 - 1
ambari-server/src/test/python/stacks/2.2/configs/secured.json


部分文件因为文件数量过多而无法显示