Browse Source

AMBARI-15762. Component install post processing can not be run in parallel (aonishuk)

Andrew Onishuk 9 years ago
parent
commit
cf2c82959d

+ 1 - 0
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -320,6 +320,7 @@ class CustomServiceOrchestrator():
     command['public_hostname'] = public_fqdn
     # Add cache dir to make it visible for commands
     command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 'cache_dir')
+    command["agentConfigParams"] = {"agent": {"parallel_execution": self.config.get_parallel_exec_option()}}
     # Now, dump the json file
     command_type = command['commandType']
     from ActionQueue import ActionQueue  # To avoid cyclic dependency

+ 4 - 2
ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py

@@ -106,7 +106,7 @@ class TestCustomServiceOrchestrator(TestCase):
                          'all_hosts'     : ['h1.hortonworks.com', 'h2.hortonworks.com'],
                          'all_ping_ports': ['8670', '8670']}
     
-    config = AmbariConfig().getConfig()
+    config = AmbariConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     dummy_controller = MagicMock()
@@ -134,6 +134,7 @@ class TestCustomServiceOrchestrator(TestCase):
     os.unlink(json_file)
     # Testing side effect of dump_command_to_json
     self.assertEquals(command['public_hostname'], "test.hst")
+    self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0)
     self.assertTrue(unlink_mock.called)
 
 
@@ -166,7 +167,7 @@ class TestCustomServiceOrchestrator(TestCase):
       'hostLevelParams':{}
     }
 
-    config = AmbariConfig().getConfig()
+    config = AmbariConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     dummy_controller = MagicMock()
@@ -190,6 +191,7 @@ class TestCustomServiceOrchestrator(TestCase):
     os.unlink(json_file)
     # Testing side effect of dump_command_to_json
     self.assertEquals(command['public_hostname'], "test.hst")
+    self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0)
     self.assertTrue(unlink_mock.called)
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))

+ 63 - 0
ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py

@@ -0,0 +1,63 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import tempfile
+import time
+import shutil
+import multiprocessing
+from unittest import TestCase
+
+from only_for_platform import  not_for_platform, PLATFORM_WINDOWS
+from resource_management.libraries.functions.fcntl_based_process_lock import FcntlBasedProcessLock
+
+class TestFcntlBasedProcessLock(TestCase):
+
+
+  @not_for_platform(PLATFORM_WINDOWS)
+  def test_fcntl_based_lock(self):
+    """
+    Test blocking_lock using multiprocessing.Lock
+    """
+    test_temp_dir = tempfile.mkdtemp(prefix="test_file_based_lock")
+    try:
+      lock_file = os.path.join(test_temp_dir, "lock")
+
+      # Raises an exception if mutex.acquire fails.
+      # It indicates that more than one process acquired the lock.
+      def dummy_task(index, mutex):
+        with FcntlBasedProcessLock(lock_file, skip_fcntl_failures = False):
+          if (not mutex.acquire(block = False)):
+            raise Exception("ERROR: FcntlBasedProcessLock was acquired by several processes")
+          time.sleep(0.1)
+          mutex.release()
+
+      mutex = multiprocessing.Lock()
+      process_list = []
+      for i in range(0, 3):
+        p = multiprocessing.Process(target=dummy_task, args=(i, mutex))
+        p.start()
+        process_list.append(p)
+
+      for p in process_list:
+        p.join(2)
+        self.assertEquals(p.exitcode, 0)
+
+    finally:
+      shutil.rmtree(test_temp_dir)
+

+ 5 - 1
ambari-common/src/main/python/resource_management/core/logger.py

@@ -54,6 +54,10 @@ class Logger:
 
     Logger.logger = logger
 
+  @staticmethod
+  def exception(text):
+    Logger.logger.exception(Logger.filter_text(text))
+
   @staticmethod
   def error(text):
     Logger.logger.error(Logger.filter_text(text))
@@ -173,4 +177,4 @@ class Logger:
     if arguments_str:
       arguments_str = arguments_str[:-2]
         
-    return unicode("{0} {{{1}}}", 'UTF-8').format(name, arguments_str)
+    return unicode("{0} {{{1}}}", 'UTF-8').format(name, arguments_str)

+ 111 - 0
ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py

@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.logger import Logger
+
+class FcntlBasedProcessLock(object):
+  """A file descriptor based lock for interprocess locking.
+  The lock is automatically released when process dies.
+
+  WARNING: A file system and OS must support fcntl.lockf.
+  Doesn't work on Windows systems. Doesn't work properly on
+  some NFS implementations.
+
+  Currently Ambari uses FcntlBasedProcessLock only when parallel
+  execution is enabled on the agent.
+
+  WARNING: Do not use this lock for synchronization between threads.
+  Multiple threads in a same process can simultaneously acquire this lock.
+  It should be used only for locking between processes.
+  """
+
+  def __init__(self, lock_file_path, skip_fcntl_failures, enabled = True):
+    """
+    :param lock_file_path: The path to the file used for locking
+    :param skip_fcntl_failures: Use this only if the lock is not mandatory.
+                                If set to True, the lock will ignore fcntl call failures.
+                                Locking will not work, if fcntl is not supported.
+                                skip_fcntl_failures prevents exceptions raising in this case.
+    :param enabled: If is set to False, fcntl will not be imported and lock/unlock methods return immediately.
+    """
+    self.lock_file_name = lock_file_path
+    self.lock_file = None
+    self.acquired = False
+    self.skip_fcntl_failures = skip_fcntl_failures
+    self.enabled = enabled
+
+  def blocking_lock(self):
+    """
+    Creates the lock file if it doesn't exist.
+    Waits to acquire an exclusive lock on the lock file descriptor.
+    """
+    if not self.enabled:
+      return
+    import fcntl
+    Logger.info("Trying to acquire a lock on {0}".format(self.lock_file_name))
+    if self.lock_file is None or self.lock_file.closed:
+      self.lock_file = open(self.lock_file_name, 'a')
+    try:
+      fcntl.lockf(self.lock_file, fcntl.LOCK_EX)
+    except:
+      if self.skip_fcntl_failures:
+        Logger.exception("Fcntl call raised an exception. A lock was not aquired. "
+                         "Continuing as skip_fcntl_failures is set to True")
+      else:
+        raise
+    else:
+      self.acquired = True
+      Logger.info("Acquired the lock on {0}".format(self.lock_file_name))
+
+  def unlock(self):
+    """
+    Unlocks the lock file descriptor.
+    """
+    if not self.enabled:
+      return
+    import fcntl
+    Logger.info("Releasing the lock on {0}".format(self.lock_file_name))
+    if self.acquired:
+      try:
+        fcntl.lockf(self.lock_file, fcntl.LOCK_UN)
+      except:
+        if self.skip_fcntl_failures:
+          Logger.exception("Fcntl call raised an exception. The lock was not released. "
+                           "Continuing as skip_fcntl_failures is set to True")
+        else:
+          raise
+      else:
+        self.acquired = False
+        try:
+          self.lock_file.close()
+          self.lock_file = None
+        except IOError:
+          Logger.warning("Failed to close {0}".format(self.lock_file_name))
+
+  def __enter__(self):
+    self.blocking_lock()
+    return None
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.unlock()
+    return False
+

+ 8 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py

@@ -17,6 +17,8 @@ limitations under the License.
 
 """
 
+import os
+
 from ambari_commons.constants import AMBARI_SUDO_BINARY
 from resource_management.libraries.script import Script
 from resource_management.libraries.functions import default
@@ -26,9 +28,12 @@ from resource_management.libraries.functions import format_jvm_option
 from resource_management.libraries.functions.version import format_stack_version
 
 config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
 
 dfs_type = default("/commandParams/dfs_type", "")
 
+is_parallel_execution_enabled = int(default("/agentConfigParams/agent/parallel_execution", 0)) == 1
+
 sudo = AMBARI_SUDO_BINARY
 
 stack_version_unformatted = config['hostLevelParams']['stack_version']
@@ -89,3 +94,6 @@ has_namenode = not len(namenode_host) == 0
 
 if has_namenode or dfs_type == 'HCFS':
   hadoop_conf_dir = conf_select.get_hadoop_conf_dir(force_latest_on_upgrade=True)
+
+link_configs_lock_file = os.path.join(tmp_dir, "link_configs_lock_file")
+stack_select_lock_file = os.path.join(tmp_dir, "stack_select_lock_file")

+ 9 - 3
ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py

@@ -24,6 +24,7 @@ from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import stack_select
 from resource_management.libraries.functions.format import format
 from resource_management.libraries.functions.version import compare_versions
+from resource_management.libraries.functions.fcntl_based_process_lock import FcntlBasedProcessLock
 from resource_management.libraries.resources.xml_config import XmlConfig
 from resource_management.libraries.script import Script
 
@@ -41,8 +42,10 @@ def setup_stack_symlinks():
     # try using the exact version first, falling back in just the stack if it's not defined
     # which would only be during an intial cluster installation
     version = params.current_version if params.current_version is not None else params.stack_version_unformatted
-    stack_select.select_all(version)
 
+    # On parallel command execution this should be executed by a single process at a time.
+    with FcntlBasedProcessLock(params.stack_select_lock_file, enabled = params.is_parallel_execution_enabled, skip_fcntl_failures = True):
+      stack_select.select_all(version)
 
 def setup_config():
   import params
@@ -86,6 +89,7 @@ def link_configs(struct_out_file):
   """
   Links configs, only on a fresh install of HDP-2.3 and higher
   """
+  import params
 
   if not Script.is_stack_greater_or_equal("2.3"):
     Logger.info("Can only link configs for HDP-2.3 and higher.")
@@ -97,5 +101,7 @@ def link_configs(struct_out_file):
     Logger.info("Could not load 'version' from {0}".format(struct_out_file))
     return
 
-  for k, v in conf_select.get_package_dirs().iteritems():
-    conf_select.convert_conf_directories_to_symlinks(k, json_version, v)
+  # On parallel command execution this should be executed by a single process at a time.
+  with FcntlBasedProcessLock(params.link_configs_lock_file, enabled = params.is_parallel_execution_enabled, skip_fcntl_failures = True):
+    for k, v in conf_select.get_package_dirs().iteritems():
+      conf_select.convert_conf_directories_to_symlinks(k, json_version, v)