瀏覽代碼

AMBARI-6063 "ambari-server start command" hangs if was executed via ssh command (dsen)

Dmitry Sen 11 年之前
父節點
當前提交
cbf926a42f

+ 33 - 19
ambari-server/src/main/python/ambari-server.py

@@ -129,6 +129,7 @@ SETUP_OR_UPGRADE_MSG = "- If this is a new setup, then run the \"ambari-server s
 "- If this is an upgrade of an existing setup, run the \"ambari-server upgrade\" command.\n" \
 "Refer to the Ambari documentation for more information on setup and upgrade."
 
+AMBARI_SERVER_DIE_MSG = "Ambari Server java process died with exitcode {0}. Check {1} for more information."
 #SSL certificate metainfo
 COMMON_NAME_ATTR = 'CN'
 NOT_BEFORE_ATTR = 'notBefore'
@@ -146,7 +147,7 @@ SERVER_START_CMD = "{0}" + os.sep + "bin" + os.sep +\
                  os.getenv('AMBARI_JVM_ARGS', '-Xms512m -Xmx2048m') +\
                  " -cp {1}" + os.pathsep + "{2}" +\
                  " org.apache.ambari.server.controller.AmbariServer "\
-                 ">" + SERVER_OUT_FILE + " 2>&1"
+                 ">" + SERVER_OUT_FILE + " 2>&1 || echo $? > {3} &"
 SERVER_START_CMD_DEBUG = "{0}" + os.sep + "bin" + os.sep +\
                        "java -server -XX:NewRatio=2 -XX:+UseConcMarkSweepGC " +\
                        ambari_provider_module_option +\
@@ -154,7 +155,7 @@ SERVER_START_CMD_DEBUG = "{0}" + os.sep + "bin" + os.sep +\
                        " -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,"\
                        "server=y,suspend=n -cp {1}" + os.pathsep + "{2}" +\
                        " org.apache.ambari.server.controller.AmbariServer"
-SERVER_WRITE_PID_FILE_CMD = "pgrep -f 'org.apache.ambari.server.controller.AmbariServer' > {0}"
+SERVER_SEARCH_PATTERN = "org.apache.ambari.server.controller.AmbariServer"
 SECURITY_PROVIDER_GET_CMD = "{0}" + os.sep + "bin" + os.sep + "java -cp {1}" +\
                           os.pathsep + "{2} " +\
                           "org.apache.ambari.server.security.encryption" +\
@@ -182,7 +183,8 @@ STACK_UPGRADE_HELPER_CMD = "{0}" + os.sep + "bin" + os.sep + "java -cp {1}" +\
                           os.pathsep + "{2} " +\
                           "org.apache.ambari.server.upgrade.StackUpgradeHelper" +\
                           " {3} {4} > " + SERVER_OUT_FILE + " 2>&1"
-
+ULIMIT_CMD = "ulimit -n"
+SERVER_INIT_TIMEOUT = 5
 SERVER_START_TIMEOUT = 10
 SECURITY_KEYS_DIR = "security.server.keys_dir"
 SECURITY_MASTER_KEY_LOCATION = "security.master.key.location"
@@ -226,6 +228,7 @@ JAVA_HOME = "JAVA_HOME"
 PID_DIR = "/var/run/ambari-server"
 BOOTSTRAP_DIR_PROPERTY = "bootstrap.dir"
 PID_NAME = "ambari-server.pid"
+EXITCODE_NAME = "ambari-server.exitcode"
 AMBARI_PROPERTIES_FILE = "ambari.properties"
 AMBARI_PROPERTIES_RPMSAVE_FILE = "ambari.properties.rpmsave"
 RESOURCES_DIR_PROPERTY = "resources.dir"
@@ -2554,10 +2557,17 @@ def start(args):
 
   pidfile = PID_DIR + os.sep + PID_NAME
   command_base = SERVER_START_CMD_DEBUG if (SERVER_DEBUG_MODE or SERVER_START_DEBUG) else SERVER_START_CMD
-  command_base = "ulimit -n " + str(get_ulimit_open_files()) + "; " + command_base
-  command = command_base.format(jdk_path, conf_dir, get_ambari_classpath(), pidfile)
+  command = "%s %s; %s" % (ULIMIT_CMD, str(get_ulimit_open_files()),
+                           command_base.format(jdk_path,
+                                               conf_dir,
+                                               get_ambari_classpath(),
+                                               pidfile,
+                                               os.path.join(PID_DIR, EXITCODE_NAME))
+                           )
   if not os.path.exists(PID_DIR):
     os.makedirs(PID_DIR, 0755)
+
+    #For properly daemonization server should be started using shell as parent
   if is_root() and ambari_user != "root":
     # To inherit exported environment variables (especially AMBARI_PASSPHRASE),
     # from subprocess, we have to skip --login option of su command. That's why
@@ -2567,19 +2577,23 @@ def start(args):
     param_list = [utils.locate_file('su', '/bin'), ambari_user, "-s", utils.locate_file('sh', '/bin'), "-c", command]
   else:
     param_list = [utils.locate_file('sh', '/bin'), "-c", command]
-  pid_params_list = [utils.locate_file('sh', '/bin'), "-c", SERVER_WRITE_PID_FILE_CMD.format(pidfile)]
+
   print_info_msg("Running server: " + str(param_list))
-  server_process = subprocess.Popen(param_list, env=environ)
-  #wait for server process for SERVER_START_TIMEOUT seconds, if err_code None -
-  #server keep running normally and we consider that there is no errors
+  subprocess.Popen(param_list, env=environ)
+
+  #wait for server process for SERVER_START_TIMEOUT seconds
   print "Waiting for server start..."
-  err_code = wait_popen(server_process, SERVER_START_TIMEOUT)
-  if  err_code is not None:
-    raise FatalException(-1, "Ambari Server java process died with exit code {0}. Check {1} for more information.".format(err_code, SERVER_OUT_FILE))
-  write_pid_file_process = subprocess.Popen(pid_params_list, env=environ)
-  print "Server PID at: "+pidfile
-  print "Server out at: "+SERVER_OUT_FILE
-  print "Server log at: "+SERVER_LOG_FILE
+
+  pids = utils.looking_for_pid(SERVER_SEARCH_PATTERN, SERVER_INIT_TIMEOUT)
+  if utils.wait_for_pid(pids, SERVER_START_TIMEOUT) <= 0:
+    exitcode = utils.check_exitcode(os.path.join(PID_DIR, EXITCODE_NAME))
+    raise FatalException(-1, AMBARI_SERVER_DIE_MSG.format(exitcode, SERVER_OUT_FILE))
+  else:
+    utils.save_main_pid_ex(pids, pidfile, [utils.locate_file('sh', '/bin'),
+                                 utils.locate_file('bash', '/bin')], True)
+    print "Server PID at: "+pidfile
+    print "Server out at: "+SERVER_OUT_FILE
+    print "Server log at: "+SERVER_LOG_FILE
 
 
 #
@@ -2620,10 +2634,10 @@ def upgrade_stack(args, stack_id, repo_url=None, repo_url_os=None):
     local_repo_check_commamd = 'yum repolist | grep "{0} "'
   elif OS_FAMILY == OSConst.SUSE_FAMILY:
     local_repo_check_commamd = 'zypper repos | grep "{0} "'
-  
+
   command = local_repo_check_commamd.format(stack_id)
   (retcode, stdout, stderr) = run_in_shell(command)
-  
+
   if not retcode == 0 and repo_url is None:
     raise FatalException(retcode, 'Repository for ' + stack_id + " is not existed")
 
@@ -4261,7 +4275,7 @@ def main():
   matches = 0
   for args_number_required in possible_args_numbers:
     matches += int(len(args) == args_number_required)
-      
+
   if matches == 0:
     print parser.print_help()
     possible_args = ' or '.join(str(x) for x in possible_args_numbers)

+ 117 - 4
ambari-server/src/main/python/ambari_server/utils.py

@@ -18,6 +18,8 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 import os
+import signal
+import time
 from common_functions import OSConst
 
 #PostgreSQL settings
@@ -29,20 +31,131 @@ PG_STATUS_RUNNING_DEFAULT = "running"
 ENV_PATH_DEFAULT = ['/bin', '/usr/bin', '/sbin', '/usr/sbin']  # default search path
 ENV_PATH = os.getenv('PATH', '').split(':') + ENV_PATH_DEFAULT
 
+#Process
+PROC_DIR = '/proc'
+PROC_CMDLINE = 'cmdline'
+PROC_EXEC = 'exe'
+
 
   # ToDo: move that function to common-functions
 def locate_file(filename, default=''):
   """Locate command path according to OS environment"""
   for path in ENV_PATH:
-    path = "%s/%s" % (path, filename)
+    path = os.path.join(path, filename)
     if os.path.isfile(path):
       return path
   if default != '':
-    return "%s/%s" % (default, filename)
+    return os.path.join(default, filename)
   else:
     return filename
 
 
+def check_exitcode(exitcode_file_path):
+  """
+    Return exitcode of application, which is stored in the exitcode_file_path
+  """
+  exitcode = -1
+  if os.path.isfile(exitcode_file_path):
+    try:
+      f = open(exitcode_file_path, "rb")
+      exitcode = int(f.read())
+      f.close()
+      os.remove(exitcode_file_path)
+    except IOError:
+      pass
+  return exitcode
+
+
+def save_main_pid_ex(pids, pidfile, exclude_list=[], kill_exclude_list=False):
+  """
+    Save pid which is not included to exclude_list to pidfile.
+    If kill_exclude_list is set to true,  all processes in that
+    list would be killed. It's might be useful to daemonize child process
+
+    exclude_list contains list of full executable paths which should be excluded
+  """
+  try:
+    pfile = open(pidfile, "w")
+    for item in pids:
+      if pid_exists(item["pid"]) and (item["exe"] not in exclude_list):
+        pfile.write("%s\n" % item["pid"])
+      if pid_exists(item["pid"]) and (item["exe"] in exclude_list):
+        try:
+          os.kill(int(item["pid"]), signal.SIGKILL)
+        except:
+          pass
+  except IOError:
+    pass
+  finally:
+    try:
+      pfile.close()
+    except:
+      pass
+
+
+def wait_for_pid(pids, timeout):
+  """
+    Check pid for existence during timeout
+  """
+  tstart = time.time()
+  pid_live = 0
+  while int(time.time()-tstart) <= timeout and len(pids) > 0:
+    pid_live = 0
+    for item in pids:
+      if pid_exists(item["pid"]):
+        pid_live += 1
+    time.sleep(1)
+  return pid_live
+
+
+def get_symlink_path(path_to_link):
+  """
+    Expand symlink to real file path
+  """
+  return os.path.normpath(os.path.join(
+    os.path.dirname(path_to_link),
+    os.readlink(path_to_link)
+  ))
+
+
+def looking_for_pid(pattern, wait_time=1):
+  """
+    Searching for pid according to given pattern of command line
+    during wait_time.
+    Wait time is required to give a time to process to be executed.
+
+    Return list of PID Items, which match the pattern.
+  """
+  tstart = time.time()
+  found_pids = []
+
+  while int(time.time()-tstart) <= wait_time:
+    pids = [pid for pid in os.listdir(PROC_DIR) if pid.isdigit()]
+    found_pids = []  # clear list
+    for pid in pids:
+      try:
+        arg = open(os.path.join(PROC_DIR, pid, PROC_CMDLINE), 'rb').read()
+        if pattern in arg:
+          found_pids += [{
+            "pid": pid,
+            "exe": get_symlink_path(os.path.join(PROC_DIR, pid, PROC_EXEC)),
+            "cmd": arg.replace('\x00', ' ').strip()
+          }]
+      except:
+        pass
+    if wait_time == 1:  # to support unit test
+      break
+    time.sleep(1)
+  return found_pids
+
+
+def pid_exists(pid):
+  """
+   Check if pid is exist
+  """
+  return os.path.exists(os.path.join(PROC_DIR, pid))
+
+
 def get_ubuntu_pg_version():
   """Return installed version of postgre server. In case of several
   installed versions will be returned a more new one.
@@ -60,7 +173,7 @@ def get_ubuntu_pg_version():
 def get_postgre_hba_dir(OS):
   """Return postgre hba dir location depends on OS"""
   if OS == OSConst.OS_UBUNTU:
-    return "%s/%s/main" % (UBUNTU_PG_HBA_ROOT, get_ubuntu_pg_version())
+    return os.path.join(UBUNTU_PG_HBA_ROOT, get_ubuntu_pg_version(), "main")
   else:
     return PG_HBA_ROOT_DEFAULT
 
@@ -68,6 +181,6 @@ def get_postgre_hba_dir(OS):
 def get_postgre_running_status(OS):
   """Return postgre running status indicator"""
   if OS == OSConst.OS_UBUNTU:
-    return "%s/main" % get_ubuntu_pg_version()
+    return os.path.join(get_ubuntu_pg_version(), "main")
   else:
     return PG_STATUS_RUNNING_DEFAULT

+ 23 - 11
ambari-server/src/test/python/TestAmbariServer.py

@@ -37,7 +37,7 @@ from ambari_server.resourceFilesKeeper import ResourceFilesKeeper, KeeperExcepti
 with patch("platform.linux_distribution", return_value = ('Suse','11','Final')):
   # We have to use this import HACK because the filename contains a dash
   ambari_server = __import__('ambari-server')
-  
+
 FatalException = ambari_server.FatalException
 NonFatalException = ambari_server.NonFatalException
 
@@ -2512,7 +2512,10 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
     self.assertEqual(None, rcode)
     self.assertTrue(setup_db_mock.called)
 
-  @patch.object(ambari_server, "wait_popen", new = MagicMock(return_value=None))
+  @patch.object(ambari_server.utils, 'looking_for_pid')
+  @patch.object(ambari_server.utils, 'wait_for_pid')
+  @patch.object(ambari_server.utils, 'save_main_pid_ex')
+  @patch.object(ambari_server.utils, 'check_exitcode')
   @patch('os.makedirs')
   @patch.object(ambari_server.utils, 'locate_file')
   @patch.object(ambari_server, 'is_server_runing')
@@ -2540,21 +2543,30 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
   @patch("os.chdir")
   @patch.object(ResourceFilesKeeper, "perform_housekeeping")
   def test_start(self, perform_housekeeping_mock, chdir_mock, getuser_mock,
-                  find_jdbc_driver_mock, is_root_mock, read_ambari_user_mock,
-                  parse_properties_file_mock, check_postgre_up_mock,
-                  print_error_msg_mock, find_jdk_mock, search_file_mock,
-                  print_info_msg_mock, popenMock, openMock, pexistsMock,
-                  get_ambari_properties_mock, os_environ_mock,
-                  get_validated_string_input_method, os_chmod_method,
-                  save_master_key_method, get_master_key_location_method,
-                  getpwnam_mock, os_chown_mock, is_server_running_mock, locate_file_mock,
-                  os_makedirs_mock):
+                 find_jdbc_driver_mock, is_root_mock, read_ambari_user_mock,
+                 parse_properties_file_mock, check_postgre_up_mock,
+                 print_error_msg_mock, find_jdk_mock, search_file_mock,
+                 print_info_msg_mock, popenMock, openMock, pexistsMock,
+                 get_ambari_properties_mock, os_environ_mock,
+                 get_validated_string_input_method, os_chmod_method,
+                 save_master_key_method, get_master_key_location_method,
+                 getpwnam_mock, os_chown_mock, is_server_running_mock, locate_file_mock,
+                 os_makedirs_mock, check_exitcode_mock, save_main_pid_ex_mock,
+                 wait_for_pid_mock, looking_for_pid_mock):
      args = MagicMock()
      locate_file_mock.side_effect = lambda *args: '/bin/su' if args[0] == 'su' else '/bin/sh'
      f = MagicMock()
      f.readline.return_value = 42
      openMock.return_value = f
 
+     looking_for_pid_mock.return_value = [{
+          "pid": "777",
+          "exe": "/test",
+          "cmd": "test arg"
+     }]
+     wait_for_pid_mock.return_value = 1
+     check_exitcode_mock.return_value = 0
+
      p = get_ambari_properties_mock.return_value
      p.get_property.return_value = 'False'
      search_file_mock.return_value = None

+ 115 - 0
ambari-server/src/test/python/TestUtils.py

@@ -61,3 +61,118 @@ class TestUtils(TestCase):
     # Testing default vaule
     isfile_mock.return_value = False
     self.assertEquals('/tmp/myfile', utils.locate_file('myfile', '/tmp'))
+
+  @patch('os.path.exists')
+  @patch('os.path.join')
+  def test_pid_exists(self, path_join_mock, path_exists_mock):
+    path_join_mock.return_value = '/test'
+    path_exists_mock.return_value = True
+    self.assertTrue(utils.pid_exists('1'))
+
+  @patch('time.time')
+  @patch('__builtin__.open')
+  @patch('time.sleep')
+  @patch('os.listdir')
+  @patch('os.path.join')
+  @patch.object(utils, 'get_symlink_path')
+  def test_looking_for_pid(self, get_symlink_path_mock, path_join_mock,
+                      listdir_mock, sleep_mock, open_mock, time_mock):
+    def test_read():
+      return "test args"
+
+    def test_obj():
+      pass
+
+    test_obj.read = test_read
+    path_join_mock.return_value = '/'
+    open_mock.return_value = test_obj
+    listdir_mock.return_value = ['1000']
+    get_symlink_path_mock.return_value = "/symlinkpath"
+    time_mock.side_effect = [0, 0, 2]
+
+    r = utils.looking_for_pid("test args", 1)
+    self.assertEquals(len(r), 1)
+    self.assertEquals(r[0], {
+       "pid": "1000",
+       "exe": "/symlinkpath",
+       "cmd": "test args"
+      })
+
+  @patch('os.path.normpath')
+  @patch('os.path.join')
+  @patch('os.path.dirname')
+  @patch('os.readlink')
+  def test_get_symlink_path(self, readlink_mock, dirname_mock, join_mock,
+                            normpath_mock):
+    normpath_mock.return_value = "test value"
+    self.assertEquals(utils.get_symlink_path("/"), "test value")
+
+  @patch('time.time')
+  @patch.object(utils, 'pid_exists')
+  @patch('time.sleep')
+  def test_wait_for_pid(self, sleep_mock, pid_exists_mock, time_mock):
+    pid_exists_mock.return_value = True
+    time_mock.side_effect = [0, 0, 2]
+    live_pids = utils.wait_for_pid([
+                                   {"pid": "111",
+                                    "exe": "",
+                                    "cmd": ""
+                                    },
+                                   {"pid": "222",
+                                    "exe": "",
+                                    "cmd": ""
+                                    },
+                                   ], 1)
+
+    self.assertEquals(2, live_pids)
+
+  @patch.object(utils, 'pid_exists')
+  @patch('__builtin__.open')
+  @patch('os.kill')
+  def test_save_main_pid_ex(self, kill_mock, open_mock, pid_exists_mock):
+    def test_write(data):
+      self.assertEquals(data, "222\n")
+
+    def test_close():
+      pass
+
+    def test_obj():
+      pass
+
+    test_obj.write = test_write
+    test_obj.close = test_close
+    open_mock.return_value = test_obj
+    pid_exists_mock.return_value = True
+
+    utils.save_main_pid_ex([{"pid": "111",
+                             "exe": "/exe1",
+                             "cmd": ""
+                             },
+                            {"pid": "222",
+                             "exe": "/exe2",
+                             "cmd": ""
+                             },
+                            ], "/pidfile", ["/exe1"], True)
+    self.assertEquals(open_mock.call_count, 1)
+    self.assertEquals(pid_exists_mock.call_count, 4)
+    self.assertEquals(kill_mock.call_count, 1)
+
+  @patch('os.path.isfile')
+  @patch('__builtin__.open')
+  @patch('os.remove')
+  def test_check_exitcode(self, remove_mock, open_mock, isfile_mock):
+    def test_read():
+      return "777"
+
+    def test_close():
+      pass
+
+    def test_obj():
+      pass
+
+    test_obj.read = test_read
+    test_obj.close = test_close
+    open_mock.return_value = test_obj
+    isfile_mock.return_value = True
+
+    self.assertEquals(utils.check_exitcode("/tmp/nofile"), 777)