소스 검색

AMBARI-4035. Add hook support for pluggable services (dlysnichenko)

Lisnichenko Dmitro 11 년 전
부모
커밋
ce08fc8ae9

+ 46 - 8
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -41,6 +41,9 @@ class CustomServiceOrchestrator():
   SCRIPT_TYPE_PYTHON = "PYTHON"
   CUSTOM_ACTION_COMMAND = 'ACTIONEXECUTE'
 
+  PRE_HOOK_PREFIX="before"
+  POST_HOOK_PREFIX="after"
+
   def __init__(self, config):
     self.config = config
     self.tmp_dir = config.get('agent', 'prefix')
@@ -56,27 +59,47 @@ class CustomServiceOrchestrator():
       command_name = command['roleCommand']
       timeout = int(command['commandParams']['command_timeout'])
       task_id = command['taskId']
+
       if command_name == self.CUSTOM_ACTION_COMMAND:
         base_dir = self.config.get('python', 'custom_actions_dir')
         script_path = os.path.join(base_dir, script)
+        hook_dir = None
       else:
         stack_name = command['hostLevelParams']['stack_name']
         stack_version = command['hostLevelParams']['stack_version']
+        hook_dir = self.file_cache.get_hook_base_dir(stack_name, stack_version)
         metadata_folder = command['commandParams']['service_metadata_folder']
         base_dir = self.file_cache.get_service_base_dir(
           stack_name, stack_version, metadata_folder, component_name)
         script_path = self.resolve_script_path(base_dir, script, script_type)
 
-      tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".
-        format(task_id))
-      if script_type.upper() == self.SCRIPT_TYPE_PYTHON:
-        json_path = self.dump_command_to_json(command)
-        script_params = [command_name, json_path, base_dir]
-        ret = self.python_executor.run_file(script_path, script_params,
-          tmpoutfile, tmperrfile, timeout, tmpstrucoutfile)
-      else:
+      tmpstrucoutfile = os.path.join(self.tmp_dir,
+                                    "structured-out-{0}.json".format(task_id))
+      if script_type.upper() != self.SCRIPT_TYPE_PYTHON:
+      # We don't support anything else yet
         message = "Unknown script type {0}".format(script_type)
         raise AgentException(message)
+      # Execute command using proper interpreter
+      json_path = self.dump_command_to_json(command)
+      script_params = [command_name, json_path, base_dir]
+      pre_hook = self.resolve_hook_script_path(hook_dir,
+          self.PRE_HOOK_PREFIX, command_name, script_type)
+      post_hook = self.resolve_hook_script_path(hook_dir,
+          self.POST_HOOK_PREFIX, command_name, script_type)
+      py_file_list = [pre_hook, script_path, post_hook]
+      # filter None values
+      filtered_py_file_list = [i for i in py_file_list if i]
+      # Executing hooks and script
+      ret = None
+      for py_file in filtered_py_file_list:
+        ret = self.python_executor.run_file(py_file, script_params,
+                               tmpoutfile, tmperrfile, timeout, tmpstrucoutfile)
+        if ret['exitcode'] != 0:
+          break
+
+      if not ret: # Something went wrong
+        raise AgentException("No script has been executed")
+
     except Exception: # We do not want to let agent fail completely
       exc_type, exc_obj, exc_tb = sys.exc_info()
       message = "Catched an exception while executing "\
@@ -102,6 +125,21 @@ class CustomServiceOrchestrator():
     return path
 
 
+  def resolve_hook_script_path(self, hook_base_dir, prefix, command_name, script_type):
+    """
+    Returns a path to hook script according to string prefix
+    and command name. If script does not exist, returns None
+    """
+    if not hook_base_dir:
+      return None
+    script_file = "{0}-{1}.py".format(prefix, command_name)
+    hook_script_path = os.path.join(hook_base_dir, script_file)
+    if not os.path.isfile(hook_script_path):
+      logger.debug("Hook script {0} not found, skipping".format(hook_script_path))
+      return None
+    return hook_script_path
+
+
   def dump_command_to_json(self, command):
     """
     Converts command to json file and returns file path

+ 16 - 0
ambari-agent/src/main/python/ambari_agent/FileCache.py

@@ -41,6 +41,7 @@ class FileCache():
     self.config = config
     self.cache_dir = config.get('agent', 'cache_dir')
 
+
   def get_service_base_dir(self, stack_name, stack_version, service, component):
     """
     Returns a base directory for service
@@ -61,3 +62,18 @@ class FileCache():
     return metadata_path
 
 
+  def get_hook_base_dir(self, stack_name, stack_version):
+    """
+    Returns a base directory for service
+    """
+    hook_base_path = os.path.join(self.cache_dir, "stacks", str(stack_name),
+                                 str(stack_version), "hooks")
+    if not os.path.isdir(hook_base_path):
+      # TODO: Metadata downloading will be implemented at Phase 2
+      # As of now, all stack definitions are packaged and distributed with
+      # agent rpm
+      message = "Hook scripts dir for not found at " \
+                "expected location {0}".format(hook_base_path)
+      raise AgentException(message)
+    return hook_base_path
+

+ 2 - 1
ambari-agent/src/main/python/resource_management/libraries/script/__init__.py

@@ -20,4 +20,5 @@ Ambari Agent
 
 """
 
-from resource_management.libraries.script.script import *
+from resource_management.libraries.script.script import *
+from resource_management.libraries.script.hook import *

+ 38 - 0
ambari-agent/src/main/python/resource_management/libraries/script/hook.py

@@ -0,0 +1,38 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+__all__ = ["Hook"]
+
+from resource_management.libraries.script import Script
+
+class Hook(Script):
+  """
+  Executes a hook for acommand for custom service. stdout and stderr are written to
+  tmpoutfile and to tmperrfile respectively.
+  """
+
+  HOOK_METHOD_NAME = "hook" # This method is always executed at hooks
+
+
+  def choose_method_to_execute(self, command_name):
+    """
+    Changes loogics of resolving method name
+    """
+    return super(Hook, self).choose_method_to_execute(self.HOOK_METHOD_NAME)

+ 17 - 9
ambari-agent/src/main/python/resource_management/libraries/script/script.py

@@ -30,7 +30,7 @@ from resource_management.core.resources.packaging import Package
 from resource_management.libraries.script.config_dictionary import ConfigDictionary
 from resource_management.libraries.script.repo_installer import RepoInstaller
 
-class Script():
+class Script(object):
   """
   Executes a command for custom service. stdout and stderr are written to
   tmpoutfile and to tmperrfile respectively.
@@ -82,19 +82,27 @@ class Script():
     except IOError:
       logger.exception("Can not read json file with command parameters: ")
       sys.exit(1)
-    # Run class method mentioned by a command type
-    self_methods = dir(self)
-    if not command_name in self_methods:
-      logger.error("Script {0} has not method '{1}'".format(sys.argv[0], command_name))
-      sys.exit(1)
-    method = getattr(self, command_name)
+    # Run class method depending on a command type
     try:
+      method = self.choose_method_to_execute(command_name)
       with Environment(basedir) as env:
         method(env)
     except Fail:
-      logger.exception("Got exception while executing method '{0}':".format(command_name))
+      logger.exception("Got exception while executing command {0}:".format(command_name))
       sys.exit(1)
 
+
+  def choose_method_to_execute(self, command_name):
+    """
+    Returns a callable object that should be executed for a given command.
+    """
+    self_methods = dir(self)
+    if not command_name in self_methods:
+      raise Fail("Script {0} has not method '{1}'".format(sys.argv[0], command_name))
+    method = getattr(self, command_name)
+    return method
+
+
   @staticmethod
   def get_config():
     """
@@ -144,4 +152,4 @@ class Script():
     """
     print("Error: " + message)
     sys.stderr.write("Error: " + message)
-    sys.exit(1)
+    sys.exit(1)

+ 33 - 2
ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py

@@ -101,11 +101,14 @@ class TestCustomServiceOrchestrator(TestCase):
 
 
   @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+  @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
   @patch.object(FileCache, "get_service_base_dir")
+  @patch.object(FileCache, "get_hook_base_dir")
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(PythonExecutor, "run_file")
   def test_runCommand(self, run_file_mock, dump_command_to_json_mock,
-                      get_service_base_dir_mock, resolve_script_path_mock):
+                      get_hook_base_dir_mock, get_service_base_dir_mock,
+                      resolve_hook_script_path_mock, resolve_script_path_mock):
     command = {
       'role' : 'REGION_SERVER',
       'hostLevelParams' : {
@@ -123,7 +126,9 @@ class TestCustomServiceOrchestrator(TestCase):
     }
     get_service_base_dir_mock.return_value = "/basedir/"
     resolve_script_path_mock.return_value = "/basedir/scriptpath"
+    resolve_hook_script_path_mock.return_value = "/basedir/hooks/hookpath"
     orchestrator = CustomServiceOrchestrator(self.config)
+    get_hook_base_dir_mock.return_value = "/hooks/"
     # normal run case
     run_file_mock.return_value = {
         'stdout' : 'sss',
@@ -133,6 +138,7 @@ class TestCustomServiceOrchestrator(TestCase):
     ret = orchestrator.runCommand(command, "out.txt", "err.txt")
     self.assertEqual(ret['exitcode'], 0)
     self.assertTrue(run_file_mock.called)
+    self.assertEqual(run_file_mock.call_count, 3)
 
     run_file_mock.reset_mock()
     # unknown script type case
@@ -145,7 +151,7 @@ class TestCustomServiceOrchestrator(TestCase):
 
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(PythonExecutor, "run_file")
-  def test_runCommand(self, run_file_mock, dump_command_to_json_mock):
+  def test_runCommand_custom_action(self, run_file_mock, dump_command_to_json_mock):
     _, script = tempfile.mkstemp()
     command = {
       'role' : 'any',
@@ -168,6 +174,31 @@ class TestCustomServiceOrchestrator(TestCase):
     ret = orchestrator.runCommand(command, "out.txt", "err.txt")
     self.assertEqual(ret['exitcode'], 0)
     self.assertTrue(run_file_mock.called)
+    # Hoooks are not supported for custom actions,
+    # that's why run_file() should be called only once
+    self.assertEqual(run_file_mock.call_count, 1)
+
+
+  @patch("os.path.isfile")
+  def test_resolve_hook_script_path(self, isfile_mock):
+
+    orchestrator = CustomServiceOrchestrator(self.config)
+    # Testing None param
+    res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command",
+                                            "script_type")
+    self.assertEqual(res1, None)
+    # Testing existing hook script
+    isfile_mock.return_value = True
+    res2 = orchestrator.resolve_hook_script_path("/hooks_dir/", "prefix", "command",
+                                            "script_type")
+    self.assertEqual(res2, "/hooks_dir/prefix-command.py")
+    # Testing not existing hook script
+    isfile_mock.return_value = False
+    res3 = orchestrator.resolve_hook_script_path("/hooks_dir/", "prefix", "command",
+                                                 "script_type")
+    self.assertEqual(res3, None)
+    pass
+
 
   def tearDown(self):
     # enable stdout

+ 28 - 0
ambari-agent/src/test/python/ambari_agent/TestFileCache.py

@@ -36,6 +36,7 @@ from mock.mock import MagicMock, patch
 import StringIO
 import sys
 from ambari_agent import AgentException
+from AgentException import AgentException
 
 
 class TestFileCache(TestCase):
@@ -55,11 +56,38 @@ class TestFileCache(TestCase):
   @patch("os.path.isdir")
   def test_get_service_base_dir(self, isdir_mock):
     fileCache = FileCache(self.config)
+    # Check existing dir case
     isdir_mock.return_value = True
     base = fileCache.get_service_base_dir("HDP", "2.0.7",
                                           "HBASE", "REGION_SERVER")
     self.assertEqual(base, "/var/lib/ambari-agent/cache/stacks/HDP/2.0.7/"
                            "services/HBASE/package")
+    # Check absent dir case
+    isdir_mock.return_value = False
+    try:
+      fileCache.get_service_base_dir("HDP", "2.0.7",
+                                          "HBASE", "REGION_SERVER")
+      self.fail("Should throw an exception")
+    except AgentException:
+      pass # Expected
+
+
+
+
+  @patch("os.path.isdir")
+  def test_get_hook_base_dir(self, isdir_mock):
+    fileCache = FileCache(self.config)
+    # Check existing dir case
+    isdir_mock.return_value = True
+    base = fileCache.get_hook_base_dir("HDP", "2.0.7")
+    self.assertEqual(base, "/var/lib/ambari-agent/cache/stacks/HDP/2.0.7/hooks")
+    # Check absent dir case
+    isdir_mock.return_value = False
+    try:
+      fileCache.get_hook_base_dir("HDP", "2.0.7")
+      self.fail("Should throw an exception")
+    except AgentException:
+      pass # Expected
 
 
   def tearDown(self):

+ 2 - 1
ambari-agent/src/test/python/ambari_agent/TestPuppetExecutor.py

@@ -257,7 +257,8 @@ class TestPuppetExecutor(TestCase):
     result = {  }
     puppetEnv = { "RUBYLIB" : ""}
     subproc_mock.returncode = 0
-    thread = Thread(target =  executor_mock.runPuppetFile, args = ("fake_puppetFile", result, puppetEnv, tmpoutfile, tmperrfile))
+    thread = Thread(target =  executor_mock.runPuppetFile, args = ("fake_puppetFile",
+                            result, puppetEnv, tmpoutfile, tmperrfile))
     thread.start()
     time.sleep(0.1)
     subproc_mock.should_finish_event.set()

+ 3 - 1
ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py

@@ -71,6 +71,7 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
+    _, tmpstrucout = tempfile.mkstemp()
     PYTHON_TIMEOUT_SECONDS =  5
 
     def launch_python_subprocess_method(command, tmpout, tmperr):
@@ -83,7 +84,8 @@ class TestPythonExecutor(TestCase):
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
-                                                      tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS))
+                                                      tmpoutfile, tmperrfile,
+                                                      PYTHON_TIMEOUT_SECONDS, tmpstrucout))
     thread.start()
     time.sleep(0.1)
     subproc_mock.should_finish_event.set()

+ 3 - 1
ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java

@@ -84,6 +84,7 @@ public class AmbariMetaInfo {
       "oraclelinux6", "suse11", "sles11", "ubuntu12");
   
   public static final String SERVICE_METRIC_FILE_NAME = "metrics.json";
+  private final static String HOOKS_DIR = "hooks";
 
   /**
    * This string is used in placeholder in places that are common for
@@ -105,7 +106,8 @@ public class AmbariMetaInfo {
   public static final FilenameFilter FILENAME_FILTER = new FilenameFilter() {
     @Override
     public boolean accept(File dir, String s) {
-      if (s.equals(".svn") || s.equals(".git"))
+      if (s.equals(".svn") || s.equals(".git") ||
+              s.equals(HOOKS_DIR)) // Hooks dir is not a service
         return false;
       return true;
     }

+ 30 - 0
ambari-server/src/main/resources/stacks/HDP/2.0._/hooks/before-START.py

@@ -0,0 +1,30 @@
+##!/usr/bin/env python2.6
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from resource_management import *
+
+class BeforeStartHook(Hook):
+
+  def hook(self, env):
+    Execute(("touch", "/tmp/hook-test"))
+
+if __name__ == "__main__":
+  BeforeStartHook().execute()