Selaa lähdekoodia

AMBARI-3924. Resource management framework: resources should be executed immediately (Andrew Onischuk via dlysnichenko)

Lisnichenko Dmitro 11 vuotta sitten
vanhempi
commit
929647d93d

+ 3 - 32
ambari-agent/src/main/python/resource_management/core/base.py

@@ -95,12 +95,9 @@ class Resource(object):
   __metaclass__ = ResourceMetaclass
 
   log = logging.getLogger("resource_management.resource")
-  is_updated = False
 
   action = ForcedListArgument(default="nothing")
   ignore_failures = BooleanArgument(default=False)
-  notifies = ResourceArgument(default=[]) # this is not supported/recommended
-  subscribes = ResourceArgument(default=[]) # this is not supported/recommended
   not_if = ResourceArgument() # pass command e.g. not_if = ('ls','/root/jdk')
   only_if = ResourceArgument() # pass command
   initial_wait = ResourceArgument() # in seconds
@@ -159,33 +156,13 @@ class Resource(object):
           raise InvalidArgument("%s %s" % (self, exc))
 
     Resource.log.debug("New resource %s: %s" % (self, self.arguments))
-    self.subscriptions = {'immediate': set(), 'delayed': set()}
-
-    for sub in self.subscribes:
-      if len(sub) == 2:
-        action, res = sub
-        immediate = False
-      else:
-        action, res, immediate = sub
-
-      res.subscribe(action, self, immediate)
-
-    for sub in self.notifies:
-      self.subscribe(*sub)
-
-    self.validate()
+    
+    if not self.env.test_mode:
+      self.env.run()
 
   def validate(self):
     pass
 
-  def subscribe(self, action, resource, immediate=False):
-    imm = "immediate" if immediate else "delayed"
-    sub = (action, resource)
-    self.subscriptions[imm].add(sub)
-
-  def updated(self):
-    self.is_updated = True
-
   def override(self, **kwargs):
     for key, value in kwargs.items():
       try:
@@ -215,9 +192,6 @@ class Resource(object):
       name=self.name,
       provider=self.provider,
       arguments=self.arguments,
-      subscriptions=self.subscriptions,
-      subscribes=self.subscribes,
-      notifies=self.notifies,
       env=self.env,
     )
 
@@ -225,9 +199,6 @@ class Resource(object):
     self.name = state['name']
     self.provider = state['provider']
     self.arguments = state['arguments']
-    self.subscriptions = state['subscriptions']
-    self.subscribes = state['subscribes']
-    self.notifies = state['notifies']
     self.env = state['env']
 
     Resource.log = logging.getLogger("resource_management.resource")

+ 7 - 15
ambari-agent/src/main/python/resource_management/core/environment.py

@@ -38,21 +38,22 @@ from resource_management.core.system import System
 class Environment(object):
   _instances = []
 
-  def __init__(self, basedir=None):
+  def __init__(self, basedir=None, test_mode=False):
     """
     @param basedir: basedir/files, basedir/templates are the places where templates / static files
     are looked up
-    @param params: configurations dictionary (this will be accessible in the templates)
+    @param test_mode: if this is enabled, resources won't be executed until manualy running env.run().
     """
     self.log = logging.getLogger("resource_management")
-    self.reset(basedir)
+    self.reset(basedir, test_mode)
 
-  def reset(self, basedir):
+  def reset(self, basedir, test_mode):
     self.system = System.get_instance()
     self.config = AttributeDictionary()
     self.resources = {}
     self.resource_list = []
     self.delayed_actions = set()
+    self.test_mode = test_mode
     self.update_config({
       # current time
       'date': datetime.now(),
@@ -115,16 +116,6 @@ class Environment(object):
       raise Fail("%r does not implement action %s" % (provider, action))
     provider_action()
 
-    if resource.is_updated:
-      for action, res in resource.subscriptions['immediate']:
-        self.log.info(
-          "%s sending %s action to %s (immediate)" % (resource, action, res))
-        self.run_action(res, action)
-      for action, res in resource.subscriptions['delayed']:
-        self.log.info(
-          "%s sending %s action to %s (delayed)" % (resource, action, res))
-      self.delayed_actions |= resource.subscriptions['delayed']
-
   def _check_condition(self, cond):
     if hasattr(cond, '__call__'):
       return cond()
@@ -138,7 +129,8 @@ class Environment(object):
   def run(self):
     with self:
       # Run resource actions
-      for resource in self.resource_list:
+      while self.resource_list:
+        resource = self.resource_list.pop(0)
         self.log.debug("Running resource %r" % resource)
         
         if resource.initial_wait:

+ 0 - 4
ambari-agent/src/main/python/resource_management/core/providers/accounts.py

@@ -60,13 +60,11 @@ class UserProvider(Provider):
     command.append(self.resource.username)
 
     shell.checked_call(command)
-    self.resource.updated()
 
   def action_remove(self):
     if self.user:
       command = ['userdel', self.resource.username]
       shell.checked_call(command)
-      self.resource.updated()
       self.log.info("Removed user %s" % self.resource)
 
   @property
@@ -100,7 +98,6 @@ class GroupProvider(Provider):
     command.append(self.resource.group_name)
 
     shell.checked_call(command)
-    self.resource.updated()
 
     group = self.group
 
@@ -108,7 +105,6 @@ class GroupProvider(Provider):
     if self.group:
       command = ['groupdel', self.resource.group_name]
       shell.checked_call(command)
-      self.resource.updated()
       self.log.info("Removed group %s" % self.resource)
 
   @property

+ 0 - 3
ambari-agent/src/main/python/resource_management/core/providers/mount.py

@@ -48,14 +48,12 @@ class MountProvider(Provider):
       check_call(args)
 
       self.log.info("%s mounted" % self)
-      self.resource.updated()
 
   def action_umount(self):
     if self.is_mounted():
       check_call(["umount", self.resource.mount_point])
 
       self.log.info("%s unmounted" % self)
-      self.resource.updated()
     else:
       self.log.debug("%s is not mounted" % self)
 
@@ -79,7 +77,6 @@ class MountProvider(Provider):
         ))
 
       self.log.info("%s enabled" % self)
-      self.resource.updated()
 
   def action_disable(self):
     pass # TODO

+ 0 - 6
ambari-agent/src/main/python/resource_management/core/providers/service.py

@@ -31,28 +31,22 @@ class ServiceProvider(Provider):
   def action_start(self):
     if not self.status():
       self._exec_cmd("start", 0)
-      self.resource.updated()
 
   def action_stop(self):
     if self.status():
       self._exec_cmd("stop", 0)
-      self.resource.updated()
 
   def action_restart(self):
     if not self.status():
       self._exec_cmd("start", 0)
-      self.resource.updated()
     else:
       self._exec_cmd("restart", 0)
-      self.resource.updated()
 
   def action_reload(self):
     if not self.status():
       self._exec_cmd("start", 0)
-      self.resource.updated()
     else:
       self._exec_cmd("reload", 0)
-      self.resource.updated()
 
   def status(self):
     return self._exec_cmd("status") == 0

+ 4 - 22
ambari-agent/src/main/python/resource_management/core/providers/system.py

@@ -56,7 +56,6 @@ def _coerce_gid(group):
 
 def _ensure_metadata(path, user, group, mode=None, log=None):
   stat = os.stat(path)
-  updated = False
 
   if mode:
     existing_mode = stat.st_mode & 07777
@@ -64,7 +63,6 @@ def _ensure_metadata(path, user, group, mode=None, log=None):
       log and log.info("Changing permission for %s from %o to %o" % (
       path, existing_mode, mode))
       os.chmod(path, mode)
-      updated = True
 
   if user:
     uid = _coerce_uid(user)
@@ -72,7 +70,6 @@ def _ensure_metadata(path, user, group, mode=None, log=None):
       log and log.info(
         "Changing owner for %s from %d to %s" % (path, stat.st_uid, user))
       os.chown(path, uid, -1)
-      updated = True
 
   if group:
     gid = _coerce_gid(group)
@@ -80,9 +77,6 @@ def _ensure_metadata(path, user, group, mode=None, log=None):
       log and log.info(
         "Changing group for %s from %d to %s" % (path, stat.st_gid, group))
       os.chown(path, -1, gid)
-      updated = True
-
-  return updated
 
 
 class FileProvider(Provider):
@@ -116,12 +110,10 @@ class FileProvider(Provider):
       with open(path, "wb") as fp:
         if content:
           fp.write(content)
-      self.resource.updated()
 
-    if _ensure_metadata(self.resource.path, self.resource.owner,
+    _ensure_metadata(self.resource.path, self.resource.owner,
                         self.resource.group, mode=self.resource.mode,
-                        log=self.log):
-      self.resource.updated()
+                        log=self.log)
 
   def action_delete(self):
     path = self.resource.path
@@ -132,7 +124,6 @@ class FileProvider(Provider):
     if os.path.exists(path):
       self.log.info("Deleting %s" % self.resource)
       os.unlink(path)
-      self.resource.updated()
 
   def _get_content(self):
     content = self.resource.content
@@ -158,14 +149,12 @@ class DirectoryProvider(Provider):
           raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
         
         os.mkdir(path, self.resource.mode or 0755)
-      self.resource.updated()
       
     if not os.path.isdir(path):
       raise Fail("Applying %s failed, file %s already exists" % (self.resource, path))
 
-    if _ensure_metadata(path, self.resource.owner, self.resource.group,
-                        mode=self.resource.mode, log=self.log):
-      self.resource.updated()
+    _ensure_metadata(path, self.resource.owner, self.resource.group,
+                        mode=self.resource.mode, log=self.log)
 
   def action_delete(self):
     path = self.resource.path
@@ -175,7 +164,6 @@ class DirectoryProvider(Provider):
       
       self.log.info("Removing directory %s and all its content" % self.resource)
       shutil.rmtree(path)
-      self.resource.updated()
 
 
 class LinkProvider(Provider):
@@ -200,21 +188,18 @@ class LinkProvider(Provider):
       
       self.log.info("Creating hard %s" % self.resource)
       os.link(self.resource.to, path)
-      self.resource.updated()
     else:
       if not os.path.exists(self.resource.to):
         self.log.info("Warning: linking to nonexistent location %s", self.resource.to)
         
       self.log.info("Creating symbolic %s" % self.resource)
       os.symlink(self.resource.to, path)
-      self.resource.updated()
 
   def action_delete(self):
     path = self.resource.path
     if os.path.exists(path):
       self.log.info("Deleting %s" % self.resource)
       os.unlink(path)
-      self.resource.updated()
 
 
 def _preexec_fn(resource):
@@ -257,8 +242,6 @@ class ExecuteProvider(Provider):
         else:
           self.log.info("Retrying after %d seconds. Reason: %s", self.resource.try_sleep, str(ex))
           time.sleep(self.resource.try_sleep)
-
-    self.resource.updated()
        
 
 class ExecuteScriptProvider(Provider):
@@ -274,4 +257,3 @@ class ExecuteScriptProvider(Provider):
       shell.call([self.resource.interpreter, tf.name],
                       cwd=self.resource.cwd, env=self.resource.environment,
                       preexec_fn=_preexec_fn(self.resource))
-    self.resource.updated()

+ 0 - 1
ambari-agent/src/main/python/resource_management/libraries/providers/execute_hadoop.py

@@ -51,4 +51,3 @@ class ExecuteHadoopProvider(Provider):
         try_sleep   = self.resource.try_sleep,
         logoutput   = self.resource.logoutput,
       )
-    env.run()

+ 0 - 1
ambari-agent/src/main/python/resource_management/libraries/providers/template_config.py

@@ -41,4 +41,3 @@ class TemplateConfigProvider(Provider):
        mode    = self.resource.mode,
        content = Template(template_name, extra_imports=self.resource.extra_imports)
       )
-    env.run()

+ 0 - 1
ambari-agent/src/main/python/resource_management/libraries/providers/xml_config.py

@@ -49,4 +49,3 @@ class XmlConfigProvider(Provider):
         group = self.resource.group,
         mode = self.resource.mode
       )
-    env.run()

+ 0 - 1
ambari-agent/src/main/python/resource_management/libraries/script/script.py

@@ -78,7 +78,6 @@ class Script():
     try:
       with Environment(basedir) as env:
         method(env)
-      env.run()
     except Fail:
       logger.exception("Got exception while executing method '{0}':".format(command_type))
       sys.exit(1)

+ 12 - 12
ambari-agent/src/test/python/resource_management/TestFileResource.py

@@ -44,7 +44,7 @@ class TestFileResource(TestCase):
              mode=0777,
              content='file-content'
         )
-      env.run()
+      
       self.fail("Must fail when directory with name 'path' exist")
     except Fail as e:
       self.assertEqual("Applying File['/existent_directory'] failed, directory with name /existent_directory exists",
@@ -67,7 +67,7 @@ class TestFileResource(TestCase):
              mode=0777,
              content='file-content'
         )
-      env.run()
+      
       self.fail('Must fail on non existent parent directory')
     except Fail as e:
       self.assertEqual(
@@ -93,7 +93,7 @@ class TestFileResource(TestCase):
            mode=0777,
            content='file-content'
       )
-    env.run()
+    
 
     open_mock.assert_called_with('/directory/file', 'wb')
     new_file.__enter__().write.assert_called_with('file-content')
@@ -123,7 +123,7 @@ class TestFileResource(TestCase):
            content='new-content'
       )
 
-    env.run()
+    
     old_file.read.assert_called()
     new_file.__enter__().write.assert_called_with('new-content')
     ensure_mock.assert_called()
@@ -149,7 +149,7 @@ class TestFileResource(TestCase):
              backup=False,
              content='new-content'
         )
-      env.run()
+      
       self.fail("Should fail when deleting directory")
     except Fail:
       pass
@@ -174,7 +174,7 @@ class TestFileResource(TestCase):
            backup=False,
            content='new-content'
       )
-    env.run()
+    
 
     self.assertEqual(isdir_mock.call_count, 1)
     self.assertEqual(exist_mock.call_count, 1)
@@ -195,7 +195,7 @@ class TestFileResource(TestCase):
              mode=0777,
              content='file-content'
         )
-      env.run()
+      
       self.fail("Must fail when directory with name 'path' exist")
     except Fail as e:
       pass
@@ -222,7 +222,7 @@ class TestFileResource(TestCase):
            backup=False,
            content='new-content'
       )
-    env.run()
+    
 
     self.assertEqual(backup_file_mock.call_count, 0)
 
@@ -233,7 +233,7 @@ class TestFileResource(TestCase):
            backup=True,
            content='new-content'
       )
-    env.run()
+    
 
     self.assertEqual(backup_file_mock.call_count, 1)
     backup_file_mock.assert_called_with('/directory/file')
@@ -262,7 +262,7 @@ class TestFileResource(TestCase):
            replace=False
       )
 
-    env.run()
+    
     old_file.read.assert_called()
     self.assertEqual(new_file.__enter__().write.call_count, 0)
     ensure_mock.assert_called()
@@ -303,7 +303,7 @@ class TestFileResource(TestCase):
            owner='root',
            group='hdfs'
       )
-    env.run()
+    
 
     open_mock.assert_called_with('/directory/file', 'wb')
     self.assertEqual(open_mock.call_count, 1)
@@ -326,7 +326,7 @@ class TestFileResource(TestCase):
            owner='root',
            group='hdfs'
       )
-    env.run()
+    
 
     self.assertEqual(chmod_mock.call_count, 1)
     self.assertEqual(chown_mock.call_count, 0)

+ 5 - 5
ambari-agent/src/test/python/resource_management/TestGroupResource.py

@@ -42,7 +42,7 @@ class TestGroupResource(TestCase):
             action='create',
             password='secure'
       )
-    env.run()
+    
 
     self.assertEqual(popen_mock.call_count, 1)
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'groupadd -p secure hadoop'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
@@ -63,7 +63,7 @@ class TestGroupResource(TestCase):
             gid=2,
             password='secure'
       )
-    env.run()
+    
 
     self.assertEqual(popen_mock.call_count, 1)
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'groupmod -p secure -g 2 mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
@@ -85,7 +85,7 @@ class TestGroupResource(TestCase):
               gid=2,
               password='secure'
         )
-      env.run()
+      
       self.fail("Action 'create' should fail when checked_call fails")
     except Fail:
       pass
@@ -107,7 +107,7 @@ class TestGroupResource(TestCase):
       Group('mapred',
             action='remove'
       )
-    env.run()
+    
 
     self.assertEqual(popen_mock.call_count, 1)
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
@@ -128,7 +128,7 @@ class TestGroupResource(TestCase):
         Group('mapred',
               action='remove'
         )
-      env.run()
+      
       self.fail("Action 'delete' should fail when checked_call fails")
     except Fail:
       pass

+ 3 - 3
ambari-agent/src/test/python/ambari_agent/TestScript.py → ambari-agent/src/test/python/resource_management/TestScript.py

@@ -62,21 +62,21 @@ class TestScript(TestCase):
     }
 
     # Testing config without any keys
-    with Environment(".") as env:
+    with Environment(".", test_mode=True) as env:
       script = Script()
       Script.config = no_such_entry_config
       script.install_packages(env)
     self.assertEquals(len(env.resource_list), 0)
 
     # Testing empty package list
-    with Environment(".") as env:
+    with Environment(".", test_mode=True) as env:
       script = Script()
       Script.config = empty_config
       script.install_packages(env)
     self.assertEquals(len(env.resource_list), 0)
 
     # Testing installing of a list of packages
-    with Environment(".") as env:
+    with Environment(".", test_mode=True) as env:
       Script.config = dummy_config
       script.install_packages("env")
     resource_dump = pprint.pformat(env.resource_list)

+ 10 - 20
ambari-agent/src/test/python/resource_management/TestUserResource.py

@@ -37,11 +37,10 @@ class TestUserResource(TestCase):
     getpwnam_mock.return_value = None
     with Environment('/') as env:
       user = User("mapred", action = "create")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'useradd -m -s /bin/bash mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -53,11 +52,10 @@ class TestUserResource(TestCase):
 
     with Environment('/') as env:
       user = User("mapred", action = "create")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'usermod -s /bin/bash mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -69,11 +67,10 @@ class TestUserResource(TestCase):
 
     with Environment('/') as env:
       user = User("mapred", action = "remove")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'userdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -87,11 +84,10 @@ class TestUserResource(TestCase):
       user = User("mapred",
                   action = "create",
                   comment = "testComment")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'usermod -c testComment -s /bin/bash mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -105,11 +101,10 @@ class TestUserResource(TestCase):
       user = User("mapred",
                   action = "create",
                   home = "/test/home")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'usermod -s /bin/bash -d /test/home mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -123,11 +118,10 @@ class TestUserResource(TestCase):
       user = User("mapred",
                   action = "create",
                   password = "secure")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'usermod -s /bin/bash -p secure mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -141,11 +135,10 @@ class TestUserResource(TestCase):
       user = User("mapred",
                   action = "create",
                   shell = "/bin/sh")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'usermod -s /bin/sh mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -159,11 +152,10 @@ class TestUserResource(TestCase):
       user = User("mapred",
                   action = "create",
                   uid = "1")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'usermod -s /bin/bash -u 1 mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -177,11 +169,10 @@ class TestUserResource(TestCase):
       user = User("mapred",
                   action = "create",
                   gid = "1")
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'usermod -s /bin/bash -g 1 mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)
 
   @patch.object(subprocess, "Popen")
   @patch.object(pwd, "getpwnam")
@@ -195,8 +186,7 @@ class TestUserResource(TestCase):
       user = User("mapred",
                   action = "create",
                   groups = ['1','2','3'])
-    env.run()
+    
 
     popen_mock.assert_called_with(['/bin/bash', '--login', '-c', 'usermod -G 1,2,3 -s /bin/bash mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env=None, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
-    self.assertTrue(user.is_updated)