Przeglądaj źródła

AMBARI-3586. Introduce CustomServiceOrchestrator and basic script classes (dlysnichenko)

Lisnichenko Dmitro 11 lat temu
rodzic
commit
98adf59479

+ 1 - 0
ambari-agent/conf/unix/ambari-agent.ini

@@ -24,6 +24,7 @@ loglevel=INFO
 data_cleanup_interval=86400
 data_cleanup_max_age=2592000
 ping_port=8670
+cache_dir=/var/lib/ambari-agent/cache
 
 [stack]
 installprefix=/var/ambari-agent/

+ 9 - 1
ambari-agent/pom.xml

@@ -351,7 +351,15 @@
                 </source>
               </sources>
             </mapping>
-            <!-- -->
+            <mapping>
+              <!-- TODO: Remove when we introduce metadata downloading by agent-->
+              <directory>/var/lib/ambari-agent/cache/stacks</directory>
+              <sources>
+                <source>
+                  <location>../ambari-server/src/main/resources/stacks</location>
+                </source>
+              </sources>
+            </mapping>
           </mappings>
         </configuration>
       </plugin>

+ 11 - 4
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -47,8 +47,13 @@ class ActionQueue(threading.Thread):
 
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
+  ROLE_COMMAND_INSTALL = 'INSTALL'
+  ROLE_COMMAND_START = 'START'
+  ROLE_COMMAND_STOP = 'STOP'
 
   IN_PROGRESS_STATUS = 'IN_PROGRESS'
+  COMPLETED_STATUS = 'COMPLETED'
+  FAILED_STATUS = 'FAILED'
 
   def __init__(self, config, controller):
     super(ActionQueue, self).__init__()
@@ -119,6 +124,7 @@ class ActionQueue(threading.Thread):
       'status': self.IN_PROGRESS_STATUS
     })
     self.commandStatuses.put_command_status(command, in_progress_status)
+    # TODO: Add CustomServiceOrchestrator call somewhere here
     # running command
     # Create a new instance of executor for the current thread
     puppetExecutor = PuppetExecutor.PuppetExecutor(
@@ -128,10 +134,11 @@ class ActionQueue(threading.Thread):
       self.config.get('agent', 'prefix'), self.config)
     commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
       in_progress_status['tmperr'])
+
     # dumping results
-    status = "COMPLETED"
+    status = self.COMPLETED_STATUS
     if commandresult['exitcode'] != 0:
-      status = "FAILED"
+      status = self.FAILED_STATUS
     roleResult = self.commandStatuses.generate_report_template(command)
     # assume some puppet plumbing to run these commands
     roleResult.update({
@@ -146,13 +153,13 @@ class ActionQueue(threading.Thread):
       roleResult['stderr'] = 'None'
 
     # let ambari know that configuration tags were applied
-    if status == 'COMPLETED':
+    if status == self.COMPLETED_STATUS:
       configHandler = ActualConfigHandler(self.config)
       if command.has_key('configurationTags'):
         configHandler.write_actual(command['configurationTags'])
         roleResult['configurationTags'] = command['configurationTags']
 
-      if command.has_key('roleCommand') and command['roleCommand'] == 'START':
+      if command.has_key('roleCommand') and command['roleCommand'] == self.ROLE_COMMAND_START:
         configHandler.copy_to_component(command['role'])
         roleResult['configurationTags'] = configHandler.read_actual_component(command['role'])
     self.commandStatuses.put_command_status(command, roleResult)

+ 24 - 0
ambari-agent/src/main/python/ambari_agent/AgentException.py

@@ -0,0 +1,24 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+class AgentException(Exception):
+  def __init__(self, value):
+    self.parameter = value
+
+  def __str__(self):
+    return repr(self.parameter)

+ 1 - 0
ambari-agent/src/main/python/ambari_agent/AmbariConfig.py

@@ -34,6 +34,7 @@ prefix=/tmp/ambari-agent
 data_cleanup_interval=86400
 data_cleanup_max_age=2592000
 ping_port=8670
+cache_dir=/var/lib/ambari-agent/cache
 
 [services]
 

+ 132 - 0
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -0,0 +1,132 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import os
+import json, pprint
+import sys
+
+from FileCache import FileCache
+from AgentException import AgentException
+from PythonExecutor import PythonExecutor
+from AmbariConfig import AmbariConfig
+
+
+logger = logging.getLogger()
+
+class CustomServiceOrchestrator():
+  """
+  Executes a command for custom service. stdout and stderr are written to
+  tmpoutfile and to tmperrfile respectively.
+  """
+
+  SCRIPT_TYPE_PYTHON = "PYTHON"
+
+  def __init__(self, config):
+    self.config = config
+    self.tmp_dir = config.get('agent', 'prefix')
+    self.file_cache = FileCache(config)
+    self.python_executor = PythonExecutor(self.tmp_dir, config)
+
+
+  def runCommand(self, command, tmpoutfile, tmperrfile):
+    try:
+      # TODO: Adjust variables
+      service_name = command['serviceName']
+      component_name = command['role']
+      stack_name = command['stackName'] # TODO: add at the server side
+      stack_version = command['stackVersion'] # TODO: add at the server side
+      script_type = command['scriptType'] # TODO: add at the server side
+      script = command['script']
+      command_name = command['roleCommand']
+      timeout = int(command['timeout']) # TODO: add at the server side
+      base_dir = self.file_cache.get_service_base_dir(
+          stack_name, stack_version, service_name, component_name)
+      script_path = self.resolve_script_path(base_dir, script, script_type)
+      if script_type == self.SCRIPT_TYPE_PYTHON:
+        json_path = self.dump_command_to_json(command)
+        script_params = [command_name, json_path, base_dir]
+        ret = self.python_executor.run_file(
+          script_path, script_params, tmpoutfile, tmperrfile, timeout)
+      else:
+        message = "Unknown script type {0}".format(script_type)
+        raise AgentException(message)
+    except Exception: # We do not want to let agent fail completely
+      exc_type, exc_obj, exc_tb = sys.exc_info()
+      message = "Catched an exception while executing "\
+        "custom service command: {0}: {1}".format(exc_type, exc_obj)
+      logger.error(message)
+      ret = {
+        'stdout' : message,
+        'stderr' : message,
+        'exitCode': 1,
+      }
+    return ret
+
+
+  def resolve_script_path(self, base_dir, script, script_type):
+    """
+    Incapsulates logic of script location determination.
+    """
+    path = os.path.join(base_dir, "package", script)
+    if not os.path.exists(path):
+      message = "Script {0} does not exist".format(path)
+      raise AgentException(message)
+    return path
+
+
+  def dump_command_to_json(self, command):
+    """
+    Converts command to json file and returns file path
+    """
+    command_id = command['commandId']
+    file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(command_id))
+    with open(file_path, "w") as f:
+      content = json.dumps(command)
+      f.write(content)
+    return file_path
+
+
+def main():
+  """
+  May be used for manual testing if needed
+  """
+  config = AmbariConfig().getConfig()
+  orchestrator = CustomServiceOrchestrator(config)
+  config.set('agent', 'prefix', "/tmp")
+  command = {
+    "serviceName" : "HBASE",
+    "role" : "HBASE_MASTER",
+    "stackName" : "HDP",
+    "stackVersion" : "1.2.0",
+    "scriptType" : "PYTHON",
+    "script" : "/tmp/1.py",
+    "roleCommand" : "START",
+    "timeout": 600
+  }
+
+  result = orchestrator.runCommand(command, "/tmp/out-1.txt", "/tmp/err-1.txt")
+  pprint.pprint(result)
+  pass
+
+
+
+if __name__ == "__main__":
+  main()

+ 66 - 0
ambari-agent/src/main/python/ambari_agent/FileCache.py

@@ -0,0 +1,66 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+
+# TODO: Update class description
+
+
+import logging
+import Queue
+import threading
+import pprint
+import os
+import json
+from ServiceComponentMetadata import ServiceComponentMetadata
+from AgentException import AgentException
+
+logger = logging.getLogger()
+
+class FileCache():
+  """
+  Provides caching and lookup for service metadata files.
+  If service metadata is not available at cache,
+  downloads relevant files from the server.
+  """
+
+  def __init__(self, config):
+    self.service_component_pool = {}
+    self.config = config
+    self.cache_dir = config.get('agent', 'cache_dir')
+
+  def get_service_base_dir(self, stack_name, stack_version, service, component):
+    """
+    Returns a base directory for service
+    """
+    metadata_path = os.path.join(self.cache_dir, "stacks", str(stack_name),
+                                 str(stack_version), str(service))
+    if not os.path.isdir(metadata_path):
+      # TODO: Metadata downloading will be implemented at Phase 2
+      # As of now, all stack definitions are packaged and distributed with
+      # agent rpm
+      message = "Metadata dir for not found for a service " \
+                "(stackName = {0}, stackVersion = {1}, " \
+                "service = {2}, " \
+                "component = {3}".format(stack_name, stack_version,
+                                                 service, component)
+      raise AgentException(message)
+    return metadata_path
+
+

+ 19 - 14
ambari-agent/src/main/python/ambari_agent/PythonExecutor.py

@@ -29,9 +29,11 @@ import shell
 logger = logging.getLogger()
 
 class PythonExecutor:
-
-  # How many seconds will pass before running puppet is terminated on timeout
-  PYTHON_TIMEOUT_SECONDS = 600
+  """
+  Performs functionality for executing python scripts.
+  Warning: class maintains internal state. As a result, instances should not be
+  used as a singleton for a concurrent execution of python scripts
+  """
 
   NO_ERROR = "none"
   grep = Grep()
@@ -43,22 +45,25 @@ class PythonExecutor:
     self.config = config
     pass
 
-  def run_file(self, command, file, tmpoutfile, tmperrfile):
+  def run_file(self, script, script_params, tmpoutfile, tmperrfile, timeout):
     """
     Executes the specified python file in a separate subprocess.
     Method returns only when the subprocess is finished.
+    Params arg is a list of script parameters
+    Timeout meaning: how many seconds should pass before script execution
+    is forcibly terminated
     """
     tmpout =  open(tmpoutfile, 'w')
     tmperr =  open(tmperrfile, 'w')
-    pythonCommand = self.pythonCommand(file)
+    pythonCommand = self.pythonCommand(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
-    process = self.lauch_python_subprocess(pythonCommand, tmpout, tmperr)
+    process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
     logger.debug("Launching watchdog thread")
     self.event.clear()
     self.python_process_has_been_killed = False
-    thread = Thread(target =  self.python_watchdog_func, args = (process, ))
+    thread = Thread(target =  self.python_watchdog_func, args = (process, timeout))
     thread.start()
-    # Waiting for process to finished or killed
+    # Waiting for the process to be either finished or killed
     process.communicate()
     self.event.set()
     thread.join()
@@ -68,14 +73,14 @@ class PythonExecutor:
     out = open(tmpoutfile, 'r').read()
     error = open(tmperrfile, 'r').read()
     if self.python_process_has_been_killed:
-      error = str(error) + "\n Puppet has been killed due to timeout"
+      error = str(error) + "\n Python script has been killed due to timeout"
       returncode = 999
     result = self.condenseOutput(out, error, returncode)
     logger.info("Result: %s" % result)
     return result
 
 
-  def lauch_python_subprocess(self, command, tmpout, tmperr):
+  def launch_python_subprocess(self, command, tmpout, tmperr):
     """
     Creates subprocess with given parameters. This functionality was moved to separate method
     to make possible unit testing
@@ -87,8 +92,8 @@ class PythonExecutor:
   def isSuccessfull(self, returncode):
     return not self.python_process_has_been_killed and returncode == 0
 
-  def pythonCommand(self, file):
-    puppetcommand = ['python', file]
+  def pythonCommand(self, script, script_params):
+    puppetcommand = ['python', script] + script_params
     return puppetcommand
 
   def condenseOutput(self, stdout, stderr, retcode):
@@ -100,8 +105,8 @@ class PythonExecutor:
     }
     return result
 
-  def python_watchdog_func(self, python):
-    self.event.wait(self.PYTHON_TIMEOUT_SECONDS)
+  def python_watchdog_func(self, python, timeout):
+    self.event.wait(timeout)
     if python.returncode is None:
       logger.error("Subprocess timed out and will be killed")
       shell.kill_process_with_children(python.pid)

+ 102 - 0
ambari-agent/src/main/python/resource_management/script.py

@@ -0,0 +1,102 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import sys
+import json
+import logging
+
+from resource_management.environment import Environment
+from resource_management.exceptions import Fail
+
+
+class Script():
+  """
+  Executes a command for custom service. stdout and stderr are written to
+  tmpoutfile and to tmperrfile respectively.
+  """
+
+  def __init__(self):
+    pass
+
+
+  def start(self, env, params):  # TODO: just for test runs; remove
+    env.set_prefixes("ddd")
+    print "Start!"
+    pass
+
+
+  def execute(self):
+    """
+    Sets up logging;
+    Parses command parameters and executes method relevant to command type
+    """
+    # set up logging (two separate loggers for stderr and stdout with different loglevels)
+    logger = logging.getLogger('resource_management')
+    logger.setLevel(logging.DEBUG)
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    chout = logging.StreamHandler(sys.stdout)
+    chout.setLevel(logging.DEBUG)
+    chout.setFormatter(formatter)
+    cherr = logging.StreamHandler(sys.stderr)
+    cherr.setLevel(logging.ERROR)
+    cherr.setFormatter(formatter)
+    logger.addHandler(cherr)
+    # parse arguments
+    if len(sys.argv) < 1+3:
+      logger.error("Script expects at least 3 arguments")
+      sys.exit(1)
+    command_type = str.lower(sys.argv[1])
+    # parse command parameters
+    command_data_file = sys.argv[2]
+    basedir = sys.argv[3]
+    try:
+      with open(command_data_file, "r") as f:
+        pass
+        params = json.load(f)
+    except IOError:
+      logger.exception("Can not read json file with command parameters: ")
+      sys.exit(1)
+    # Run class method mentioned by a command type
+    self_methods = dir(self)
+    if not command_type in self_methods:
+      logger.error("Script {0} has not method '{1}'".format(sys.argv[0], command_type))
+      sys.exit(1)
+    method = getattr(self, command_type)
+    try:
+      with Environment(basedir, params) as env:
+        method(env, params)
+      env.run()
+    except Fail:
+      logger.exception("Got exception while executing method '{0}':".format(command_type))
+      sys.exit(1)
+
+
+
+  def fail_with_error(self, message):
+    """
+    Prints error message and exits with non-zero exit code
+    """
+    print("Error: " + message)
+    sys.stderr.write("Error: " + message)
+    sys.exit(1)
+
+
+if __name__ == "__main__":
+  Script().execute()

+ 54 - 0
ambari-agent/src/test/python/TestCustomServiceOrchestrator.py

@@ -0,0 +1,54 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import ConfigParser
+
+import pprint
+
+from unittest import TestCase
+import threading
+import tempfile
+import time
+from threading import Thread
+
+from PythonExecutor import PythonExecutor
+from AmbariConfig import AmbariConfig
+from mock.mock import MagicMock, patch
+import StringIO
+import sys
+
+
+class TestCustomServiceOrchestrator(TestCase):
+
+  def setUp(self):
+    # disable stdout
+    out = StringIO.StringIO()
+    sys.stdout = out
+    # generate sample config
+    tmpdir = tempfile.gettempdir()
+    config = ConfigParser.RawConfigParser()
+    config.add_section('agent')
+    config.set('agent', 'prefix', tmpdir)
+
+
+  def tearDown(self):
+    # enable stdout
+    sys.stdout = sys.__stdout__
+
+

+ 14 - 14
ambari-agent/src/test/python/TestPythonExecutor.py

@@ -42,20 +42,20 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
-    executor.PYTHON_TIMEOUT_SECONDS = 0.1
+    PYTHON_TIMEOUT_SECONDS = 0.1
     kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
 
-    def lauch_python_subprocess_method(command, tmpout, tmperr):
+    def launch_python_subprocess_method(command, tmpout, tmperr):
       subproc_mock.tmpout = tmpout
       subproc_mock.tmperr = tmperr
       return subproc_mock
-    executor.lauch_python_subprocess = lauch_python_subprocess_method
+    executor.launch_python_subprocess = launch_python_subprocess_method
     runShellKillPgrp_method = MagicMock()
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = None
-    thread = Thread(target =  executor.run_file, args = ("fake_command",
-                                    "fake_puppetFile", tmpoutfile, tmperrfile))
+    thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
+                                                    tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS))
     thread.start()
     time.sleep(0.1)
     subproc_mock.finished_event.wait()
@@ -70,19 +70,19 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
-    executor.PYTHON_TIMEOUT_SECONDS =  5
+    PYTHON_TIMEOUT_SECONDS =  5
 
-    def lauch_python_subprocess_method(command, tmpout, tmperr):
+    def launch_python_subprocess_method(command, tmpout, tmperr):
       subproc_mock.tmpout = tmpout
       subproc_mock.tmperr = tmperr
       return subproc_mock
-    executor.lauch_python_subprocess = lauch_python_subprocess_method
+    executor.launch_python_subprocess = launch_python_subprocess_method
     runShellKillPgrp_method = MagicMock()
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
-    thread = Thread(target =  executor.run_file, args = ("fake_command",
-                                                         "fake_puppetFile", tmpoutfile, tmperrfile))
+    thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
+                                                      tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS))
     thread.start()
     time.sleep(0.1)
     subproc_mock.should_finish_event.set()
@@ -96,19 +96,19 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
-    executor.PYTHON_TIMEOUT_SECONDS =  5
+    PYTHON_TIMEOUT_SECONDS =  5
 
-    def lauch_python_subprocess_method(command, tmpout, tmperr):
+    def launch_python_subprocess_method(command, tmpout, tmperr):
       subproc_mock.tmpout = tmpout
       subproc_mock.tmperr = tmperr
       return subproc_mock
-    executor.lauch_python_subprocess = lauch_python_subprocess_method
+    executor.launch_python_subprocess = launch_python_subprocess_method
     runShellKillPgrp_method = MagicMock()
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
     subproc_mock.should_finish_event.set()
-    result = executor.run_file("command", "file", tmpoutfile, tmperrfile)
+    result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS)
     self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output'})