Forráskód Böngészése

AMBARI-3749. Resource Management bugfixes (Andrew Onischuk via dlysnichenko)

Lisnichenko Dmitro 11 éve
szülő
commit
d74bf3d75d
18 módosított fájl, 197 hozzáadás és 92 törlés
  1. 1 0
      ambari-agent/src/main/python/resource_management/core/__init__.py
  2. 33 36
      ambari-agent/src/main/python/resource_management/core/environment.py
  3. 1 1
      ambari-agent/src/main/python/resource_management/core/providers/__init__.py
  4. 8 1
      ambari-agent/src/main/python/resource_management/core/providers/system.py
  5. 3 3
      ambari-agent/src/main/python/resource_management/core/resources/system.py
  6. 11 5
      ambari-agent/src/main/python/resource_management/core/shell.py
  7. 2 1
      ambari-agent/src/main/python/resource_management/core/utils.py
  8. 2 0
      ambari-agent/src/main/python/resource_management/libraries/__init__.py
  9. 2 0
      ambari-agent/src/main/python/resource_management/libraries/functions/__init__.py
  10. 19 0
      ambari-agent/src/main/python/resource_management/libraries/functions/default.py
  11. 24 0
      ambari-agent/src/main/python/resource_management/libraries/functions/format.py
  12. 23 13
      ambari-agent/src/main/python/resource_management/libraries/providers/execute_hadoop.py
  13. 8 6
      ambari-agent/src/main/python/resource_management/libraries/providers/template_config.py
  14. 9 7
      ambari-agent/src/main/python/resource_management/libraries/providers/xml_config.py
  15. 1 1
      ambari-agent/src/main/python/resource_management/libraries/resources/execute_hadoop.py
  16. 1 0
      ambari-agent/src/main/python/resource_management/libraries/resources/template_config.py
  17. 1 0
      ambari-agent/src/main/python/resource_management/libraries/script/__init__.py
  18. 48 18
      ambari-agent/src/main/python/resource_management/libraries/script/script.py

+ 1 - 0
ambari-agent/src/main/python/resource_management/core/__init__.py

@@ -5,5 +5,6 @@ from resource_management.core.providers import *
 from resource_management.core.resources import *
 from resource_management.core.source import *
 from resource_management.core.system import *
+from resource_management.core.shell import *
 
 __version__ = "0.4.1"

+ 33 - 36
ambari-agent/src/main/python/resource_management/core/environment.py

@@ -1,36 +1,33 @@
 #!/usr/bin/env python
 
-__all__ = ["Environment","format"]
+__all__ = ["Environment"]
 
 import logging
-import sys
 import os
 import shutil
 import time
 from datetime import datetime
-from string import Formatter
 
 from resource_management.core import shell
 from resource_management.core.exceptions import Fail
 from resource_management.core.providers import find_provider
-from resource_management.core.utils import AttributeDictionary, checked_unite
+from resource_management.core.utils import AttributeDictionary
 from resource_management.core.system import System
 
 
 class Environment(object):
   _instances = []
 
-  def __init__(self, basedir=None, params=None):
+  def __init__(self, basedir=None):
     """
     @param basedir: basedir/files, basedir/templates are the places where templates / static files
     are looked up
     @param params: configurations dictionary (this will be accessible in the templates)
     """
     self.log = logging.getLogger("resource_management")
-    self.formatter = ConfigurationFormatter()
-    self.reset(basedir, params)
+    self.reset(basedir)
 
-  def reset(self, basedir, params):
+  def reset(self, basedir):
     self.system = System.get_instance()
     self.config = AttributeDictionary()
     self.resources = {}
@@ -46,7 +43,7 @@ class Environment(object):
       # dir where templates,failes dirs are 
       'basedir': basedir, 
       # variables, which can be used in templates
-      'params': params.copy(),
+      'params': {},
     })
 
   def backup_file(self, path):
@@ -69,14 +66,21 @@ class Environment(object):
       if overwrite or path[-1] not in attr:
         attr[path[-1]] = value
         
-  def add_params(self, params):
-    variables = [item for item in dir(params) if not item.startswith("__")]
-
-    for variable in variables:
-      value = getattr(params, variable)
-      if not hasattr(value, '__call__'):
-        if variable in self.config.params:
-          raise Fail("Variable %s already exists in the resource management parameters" % variable)
+  def set_params(self, arg):
+    """
+    @param arg: is a dictionary of configurations, or a module with the configurations
+    """
+    if isinstance(arg, dict):
+      variables = arg
+    else:
+      variables = dict((var, getattr(arg, var)) for var in dir(arg))
+    
+    for variable, value in variables.iteritems():
+      # don't include system variables, methods, classes, modules
+      if not variable.startswith("__") and \
+          not hasattr(value, '__call__')and \
+          not hasattr(value, '__module__') and \
+          not hasattr(value, '__file__'):
         self.config.params[variable] = value
         
   def run_action(self, resource, action):
@@ -147,6 +151,17 @@ class Environment(object):
   @classmethod
   def get_instance(cls):
     return cls._instances[-1]
+  
+  @classmethod
+  def get_instance_copy(cls):
+    """
+    Copy only configurations, but not resources execution state
+    """
+    old_instance = cls.get_instance()
+    new_instance = Environment()
+    new_instance.config = old_instance.config.copy()
+    
+    return new_instance
 
   def __enter__(self):
     self.__class__._instances.append(self)
@@ -169,22 +184,4 @@ class Environment(object):
     self.config = state['config']
     self.resources = state['resources']
     self.resource_list = state['resource_list']
-    self.delayed_actions = state['delayed_actions']
-    
-class ConfigurationFormatter(Formatter):
-  def format(self, format_string, *args, **kwargs):
-    env = Environment.get_instance()
-    variables = kwargs
-    params = env.config.params
-    
-    result = checked_unite(variables, params)
-    return self.vformat(format_string, args, result)
-  
-def format(format_string, *args, **kwargs):
-  env = Environment.get_instance()
-  variables = sys._getframe(1).f_locals
-  
-  result = checked_unite(kwargs, variables)
-  result.pop("self", None) # self kwarg would result in an error
-  return env.formatter.format(format_string, args, **result)
-  
+    self.delayed_actions = state['delayed_actions']

+ 1 - 1
ambari-agent/src/main/python/resource_management/core/providers/__init__.py

@@ -41,7 +41,7 @@ PROVIDERS = dict(
     Directory="resource_management.core.providers.system.DirectoryProvider",
     Link="resource_management.core.providers.system.LinkProvider",
     Execute="resource_management.core.providers.system.ExecuteProvider",
-    Script="resource_management.core.providers.system.ScriptProvider",
+    ExecuteScript="resource_management.core.providers.system.ExecuteScriptProvider",
     Mount="resource_management.core.providers.mount.MountProvider",
     User="resource_management.core.providers.accounts.UserProvider",
     Group="resource_management.core.providers.accounts.GroupProvider",

+ 8 - 1
ambari-agent/src/main/python/resource_management/core/providers/system.py

@@ -131,6 +131,10 @@ class DirectoryProvider(Provider):
       if self.resource.recursive:
         os.makedirs(path, self.resource.mode or 0755)
       else:
+        dirname = os.path.dirname(path)
+        if not os.path.isdir(dirname):
+          raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
+        
         os.mkdir(path, self.resource.mode or 0755)
       self.resource.updated()
       
@@ -214,6 +218,9 @@ class ExecuteProvider(Provider):
     self.log.info("Executing %s" % self.resource)
     
     if self.resource.path != []:
+      if not self.resource.environment:
+        self.resource.environment = {}
+      
       self.resource.environment['PATH'] = os.pathsep.join(self.resource.path) 
     
     for i in range (0, self.resource.tries):
@@ -232,7 +239,7 @@ class ExecuteProvider(Provider):
     self.resource.updated()
        
 
-class ScriptProvider(Provider):
+class ExecuteScriptProvider(Provider):
   def action_run(self):
     from tempfile import NamedTemporaryFile
 

+ 3 - 3
ambari-agent/src/main/python/resource_management/core/resources/system.py

@@ -1,4 +1,4 @@
-__all__ = ["File", "Directory", "Link", "Execute", "Script", "Mount"]
+__all__ = ["File", "Directory", "Link", "Execute", "ExecuteScript", "Mount"]
 
 from resource_management.core.base import Resource, ForcedListArgument, ResourceArgument, BooleanArgument
 
@@ -53,7 +53,7 @@ class Execute(Resource):
   creates = ResourceArgument()
   cwd = ResourceArgument()
   # this runs command with a specific env variables, env={'JAVA_HOME': '/usr/jdk'}
-  environment = ResourceArgument(default={})
+  environment = ResourceArgument()
   user = ResourceArgument()
   group = ResourceArgument()
   returns = ForcedListArgument(default=0)
@@ -64,7 +64,7 @@ class Execute(Resource):
   logoutput = BooleanArgument(default=False)
 
 
-class Script(Resource):
+class ExecuteScript(Resource):
   action = ForcedListArgument(default="run")
   code = ResourceArgument(required=True)
   cwd = ResourceArgument()

+ 11 - 5
ambari-agent/src/main/python/resource_management/core/shell.py

@@ -1,5 +1,8 @@
+__all__ = ["checked_call"]
+
 import logging
 import subprocess
+import pipes
 from exceptions import Fail
 
 log = logging.getLogger("resource_management.provider")
@@ -25,11 +28,14 @@ def _call(command, logoutput=False, throw_on_failure=True,
   
   @return: retrun_code, stdout
   """
-  
-  shell = not isinstance(command, (list, tuple))
-  
+  # convert to string and escape
+  if isinstance(command, (list, tuple)):
+    command = ' '.join(pipes.quote(x) for x in command)
+
+  command = ["/bin/bash","--login","-c", command]
+
   proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
-                          cwd=cwd, env=env, shell=shell,
+                          cwd=cwd, env=env, shell=False,
                           preexec_fn=preexec_fn)
   
   out = proc.communicate()[0]
@@ -39,7 +45,7 @@ def _call(command, logoutput=False, throw_on_failure=True,
     log.info(out)
   
   if throw_on_failure and code:
-    err_msg = ("Execution of '%s' returned %d. %s") % (command, code, out)
+    err_msg = ("Execution of '%s' returned %d. %s") % (command[-1], code, out)
     raise Fail(err_msg)
   
   return code, out

+ 2 - 1
ambari-agent/src/main/python/resource_management/core/utils.py

@@ -74,7 +74,8 @@ class AttributeDictionary(object):
 def checked_unite(dict1, dict2):
   for key in dict1:
     if key in dict2:
-      raise Fail("Variable '%s' already exists more than once as a variable/configuration/kwarg parameter. Cannot evaluate it." % key)
+      if not dict2[key] is dict1[key]: # it's not a big deal if this is the same variable
+        raise Fail("Variable '%s' already exists more than once as a variable/configuration/kwarg parameter. Cannot evaluate it." % key)
   
   result = dict1.copy()
   result.update(dict2)

+ 2 - 0
ambari-agent/src/main/python/resource_management/libraries/__init__.py

@@ -1,2 +1,4 @@
+from resource_management.libraries.functions import *
 from resource_management.libraries.resources import *
 from resource_management.libraries.providers import *
+from resource_management.libraries.script import *

+ 2 - 0
ambari-agent/src/main/python/resource_management/libraries/functions/__init__.py

@@ -0,0 +1,2 @@
+from resource_management.libraries.functions.default import *
+from resource_management.libraries.functions.format import *

+ 19 - 0
ambari-agent/src/main/python/resource_management/libraries/functions/default.py

@@ -0,0 +1,19 @@
+__all__ = ["default"]
+
+from resource_management.libraries.script import Script
+default_subdict='/configurations/global'
+
+def default(name, default_value=None):
+  subdicts = filter(None, name.split('/'))
+  
+  if not name.startswith('/'):
+    subdicts = filter(None, default_subdict.split('/')) + subdicts
+
+  curr_dict = Script.get_config()
+  for x in subdicts:
+    if x in curr_dict:
+      curr_dict = curr_dict[x]
+    else:
+      return default_value
+
+  return curr_dict

+ 24 - 0
ambari-agent/src/main/python/resource_management/libraries/functions/format.py

@@ -0,0 +1,24 @@
+__all__ = ["format"]
+import sys
+from string import Formatter
+from resource_management.core.exceptions import Fail
+from resource_management.core.utils import checked_unite
+from resource_management.core.environment import Environment
+
+
+class ConfigurationFormatter(Formatter):
+  def format(self, format_string, *args, **kwargs):
+    env = Environment.get_instance()
+    variables = kwargs
+    params = env.config.params
+    
+    result = checked_unite(variables, params)
+    return self.vformat(format_string, args, result)
+      
+  
+def format(format_string, *args, **kwargs):
+  variables = sys._getframe(1).f_locals
+  
+  result = checked_unite(kwargs, variables)
+  result.pop("self", None) # self kwarg would result in an error
+  return ConfigurationFormatter().format(format_string, args, **result)

+ 23 - 13
ambari-agent/src/main/python/resource_management/libraries/providers/execute_hadoop.py

@@ -1,22 +1,32 @@
+import pipes
 from resource_management import *
 
 class ExecuteHadoopProvider(Provider):
   def action_run(self):
-    kinit_path_local = self.resource.kinit_path_local
+    kinit__path_local = self.resource.kinit_path_local
     keytab = self.resource.keytab
-    principal = self.resource.principal
     conf_dir = self.resource.conf_dir
     command = self.resource.command
     
-    if self.resource.security_enabled and not self.resource.kinit_override:
-      Execute ((kinit_path_local, '-kt', keytab, principal),
-        path = ['/bin'],
-        user = self.resource.user
+    if self.resource.principal:
+      principal = self.resource.user
+    else:
+      principal = self.resource.principal
+    
+    if isinstance(command, (list, tuple)):
+      command = ' '.join(pipes.quote(x) for x in command)
+    
+    with Environment.get_instance_copy() as env:
+      if self.resource.security_enabled and not self.resource.kinit_override:
+        Execute (format("{kinit__path_local} -kt {keytab} {principal}"),
+          path = ['/bin'],
+          user = self.resource.user
+        )
+    
+      Execute (format("hadoop --config {conf_dir} {command}"),
+        user        = self.resource.user,
+        tries       = self.resource.tries,
+        try_sleep   = self.resource.try_sleep,
+        logoutput   = self.resource.logoutput,
       )
-
-    Execute (('hadoop', '--config', conf_dir, command),
-      user        = self.resource.user,
-      tries       = self.resource.tries,
-      try_sleep   = self.resource.try_sleep,
-      logoutput   = self.resource.logoutput,
-    )
+    env.run()

+ 8 - 6
ambari-agent/src/main/python/resource_management/libraries/providers/template_config.py

@@ -12,9 +12,11 @@ class TemplateConfigProvider(Provider):
     else:
       template_name = format("{file_name}-{template_tag}.j2")
 
-    File( qualified_file_name,
-     owner   = self.resource.owner,
-     group   = self.resource.group,
-     mode    = self.resource.mode,
-     content = Template(template_name)
-    )
+    with Environment.get_instance_copy() as env:
+      File( qualified_file_name,
+       owner   = self.resource.owner,
+       group   = self.resource.group,
+       mode    = self.resource.mode,
+       content = Template(template_name, extra_imports=self.resource.extra_imports)
+      )
+    env.run()

+ 9 - 7
ambari-agent/src/main/python/resource_management/libraries/providers/xml_config.py

@@ -19,10 +19,12 @@ class XmlConfigProvider(Provider):
    
   
     self.log.debug(format("Generating config: {conf_dir}/{filename}"))
-  
-    File (format("{conf_dir}/{filename}"),
-      content = config_content,
-      owner = self.resource.owner,
-      group = self.resource.group,
-      mode = self.resource.mode
-    )
+    
+    with Environment.get_instance_copy() as env:
+      File (format("{conf_dir}/{filename}"),
+        content = config_content,
+        owner = self.resource.owner,
+        group = self.resource.group,
+        mode = self.resource.mode
+      )
+    env.run()

+ 1 - 1
ambari-agent/src/main/python/resource_management/libraries/resources/execute_hadoop.py

@@ -9,12 +9,12 @@ class ExecuteHadoop(Resource):
   try_sleep = ResourceArgument(default=0) # seconds
   user = ResourceArgument()
   logoutput = BooleanArgument(default=False)
+  principal = ResourceArgument()
   
   conf_dir = ResourceArgument()
   
   security_enabled = BooleanArgument(default=False)
   keytab = ResourceArgument()
-  principal = ResourceArgument()
   kinit_path_local = ResourceArgument()
   
   actions = Resource.actions + ["run"]

+ 1 - 0
ambari-agent/src/main/python/resource_management/libraries/resources/template_config.py

@@ -8,5 +8,6 @@ class TemplateConfig(Resource):
   owner = ResourceArgument()
   group = ResourceArgument()
   template_tag = ResourceArgument()
+  extra_imports = ResourceArgument(default=[])
 
   actions = Resource.actions + ["create"]

+ 1 - 0
ambari-agent/src/main/python/resource_management/libraries/script/__init__.py

@@ -0,0 +1 @@
+from resource_management.libraries.script.script import *

+ 48 - 18
ambari-agent/src/main/python/resource_management/core/script.py → ambari-agent/src/main/python/resource_management/libraries/script/script.py

@@ -17,13 +17,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
+__all__ = ["Script"]
 
 import sys
 import json
 import logging
 
-from resource_management.environment import Environment
-from resource_management.exceptions import Fail
+from resource_management.core.environment import Environment
+from resource_management.core.exceptions import Fail
 
 
 class Script():
@@ -32,15 +33,6 @@ class Script():
   tmpoutfile and to tmperrfile respectively.
   """
 
-  def __init__(self):
-    pass
-
-
-  def start(self, env, params):  # TODO: just for test runs; remove
-    print "Start!"
-    pass
-
-
   def execute(self):
     """
     Sets up logging;
@@ -57,6 +49,7 @@ class Script():
     cherr.setLevel(logging.ERROR)
     cherr.setFormatter(formatter)
     logger.addHandler(cherr)
+    logger.addHandler(chout)
     # parse arguments
     if len(sys.argv) < 1+3:
       logger.error("Script expects at least 3 arguments")
@@ -68,7 +61,7 @@ class Script():
     try:
       with open(command_data_file, "r") as f:
         pass
-        params = json.load(f)
+        Script.config = ConfigDictionary(json.load(f))
     except IOError:
       logger.exception("Can not read json file with command parameters: ")
       sys.exit(1)
@@ -79,14 +72,16 @@ class Script():
       sys.exit(1)
     method = getattr(self, command_type)
     try:
-      with Environment(basedir, params) as env:
-        method(env, params)
+      with Environment(basedir) as env:
+        method(env)
       env.run()
     except Fail:
       logger.exception("Got exception while executing method '{0}':".format(command_type))
       sys.exit(1)
-
-
+      
+  @staticmethod
+  def get_config():
+    return Script.config
 
   def fail_with_error(self, message):
     """
@@ -96,6 +91,41 @@ class Script():
     sys.stderr.write("Error: " + message)
     sys.exit(1)
 
+class ConfigDictionary(dict):
+  """
+  Immutable config dictionary
+  """
+  
+  def __init__(self, dictionary):
+    """
+    Recursively turn dict to ConfigDictionary
+    """
+    for k, v in dictionary.iteritems():
+      if isinstance(v, dict):
+        dictionary[k] = ConfigDictionary(v)
+        
+    super(ConfigDictionary, self).__init__(dictionary)
 
-if __name__ == "__main__":
-  Script().execute()
+  def __setitem__(self, name, value):
+    raise Fail("Configuration dictionary is immutable!")
+
+  def __getitem__(self, name):
+    """
+    Use Python types
+    """
+    value = super(ConfigDictionary, self).__getitem__(name)
+    
+    if value == "true":
+      value = True
+    elif value == "false":
+      value = False
+    else: 
+      try:
+        value = int(value)
+      except (ValueError, TypeError):
+        try:
+          value =  float(value)
+        except (ValueError, TypeError):
+          pass
+    
+    return value