Selaa lähdekoodia

AMBARI-14847 - Concurrent kinit Commands Cause Alerts To Randomly Trigger (jonathanhurley)

Jonathan Hurley 9 vuotta sitten
vanhempi
commit
3ab6a3a836

+ 46 - 0
ambari-common/src/main/python/resource_management/core/global_lock.py

@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+import threading
+from resource_management.core.exceptions import Fail
+
+# concurrent kinit's can cause the following error:
+# Internal credentials cache error while storing credentials while getting initial credentials
+LOCK_TYPE_KERBEROS = "KERBEROS_LOCK"
+
+# dictionary of all global lock instances
+__GLOBAL_LOCKS = {
+  LOCK_TYPE_KERBEROS : threading.RLock()
+}
+
+def get_lock(lock_type):
+  """
+  Gets the global lock associated with the specified type. This does not actually acquire
+  the lock, it simply returns the RLock instance. It is up to the caller to invoke RLock.acquire()
+  and RLock.release() correctly.
+  :param lock_type:
+  :return: a global threading.RLock() instance
+  :rtype: threading.RLock()
+  """
+  if lock_type not in __GLOBAL_LOCKS:
+    raise Fail("There is no global lock associated with {0}".format(str(lock_type)))
+
+  return __GLOBAL_LOCKS[lock_type]

+ 28 - 19
ambari-common/src/main/python/resource_management/libraries/functions/curl_krb_request.py

@@ -24,13 +24,15 @@ __all__ = ["curl_krb_request"]
 import logging
 import logging
 import os
 import os
 import time
 import time
-import subprocess
+import threading
 
 
+from resource_management.core import global_lock
 from resource_management.core import shell
 from resource_management.core import shell
 from resource_management.core.exceptions import Fail
 from resource_management.core.exceptions import Fail
 from get_kinit_path import get_kinit_path
 from get_kinit_path import get_kinit_path
 from get_klist_path import get_klist_path
 from get_klist_path import get_klist_path
 from resource_management.libraries.functions.get_user_call_output import get_user_call_output
 from resource_management.libraries.functions.get_user_call_output import get_user_call_output
+
 # hashlib is supplied as of Python 2.5 as the replacement interface for md5
 # hashlib is supplied as of Python 2.5 as the replacement interface for md5
 # and other secure hashes.  In 2.6, md5 is deprecated.  Import hashlib if
 # and other secure hashes.  In 2.6, md5 is deprecated.  Import hashlib if
 # available, avoiding a deprecation warning under 2.6.  Import md5 otherwise,
 # available, avoiding a deprecation warning under 2.6.  Import md5 otherwise,
@@ -47,7 +49,6 @@ MAX_TIMEOUT_DEFAULT = CONNECTION_TIMEOUT_DEFAULT + 2
 
 
 logger = logging.getLogger()
 logger = logging.getLogger()
 
 
-
 def curl_krb_request(tmp_dir, keytab, principal, url, cache_file_prefix,
 def curl_krb_request(tmp_dir, keytab, principal, url, cache_file_prefix,
     krb_exec_search_paths, return_only_http_code, alert_name, user,
     krb_exec_search_paths, return_only_http_code, alert_name, user,
     connection_timeout = CONNECTION_TIMEOUT_DEFAULT):
     connection_timeout = CONNECTION_TIMEOUT_DEFAULT):
@@ -62,25 +63,33 @@ def curl_krb_request(tmp_dir, keytab, principal, url, cache_file_prefix,
   ccache_file_path = "{0}{1}{2}_{3}_cc_{4}".format(tmp_dir, os.sep, cache_file_prefix, user, ccache_file_name)
   ccache_file_path = "{0}{1}{2}_{3}_cc_{4}".format(tmp_dir, os.sep, cache_file_prefix, user, ccache_file_name)
   kerberos_env = {'KRB5CCNAME': ccache_file_path}
   kerberos_env = {'KRB5CCNAME': ccache_file_path}
 
 
-  # If there are no tickets in the cache or they are expired, perform a kinit, else use what
-  # is in the cache
-  if krb_exec_search_paths:
-    klist_path_local = get_klist_path(krb_exec_search_paths)
-  else:
-    klist_path_local = get_klist_path()
-
-  if shell.call("{0} -s {1}".format(klist_path_local, ccache_file_path), user=user)[0] != 0:
+  # concurrent kinit's can cause the following error:
+  # Internal credentials cache error while storing credentials while getting initial credentials
+  kinit_lock = global_lock.get_lock(global_lock.LOCK_TYPE_KERBEROS)
+  kinit_lock.acquire()
+  try:
+    # If there are no tickets in the cache or they are expired, perform a kinit, else use what
+    # is in the cache
     if krb_exec_search_paths:
     if krb_exec_search_paths:
-      kinit_path_local = get_kinit_path(krb_exec_search_paths)
+      klist_path_local = get_klist_path(krb_exec_search_paths)
     else:
     else:
-      kinit_path_local = get_kinit_path()
-    logger.debug("[Alert][{0}] Enabling Kerberos authentication via GSSAPI using ccache at {1}.".format(
-      alert_name, ccache_file_path))
-
-    shell.checked_call("{0} -l 5m -c {1} -kt {2} {3} > /dev/null".format(kinit_path_local, ccache_file_path, keytab, principal), user=user)
-  else:
-    logger.debug("[Alert][{0}] Kerberos authentication via GSSAPI already enabled using ccache at {1}.".format(
-      alert_name, ccache_file_path))
+      klist_path_local = get_klist_path()
+
+    if shell.call("{0} -s {1}".format(klist_path_local, ccache_file_path), user=user)[0] != 0:
+      if krb_exec_search_paths:
+        kinit_path_local = get_kinit_path(krb_exec_search_paths)
+      else:
+        kinit_path_local = get_kinit_path()
+
+      logger.debug("[Alert][{0}] Enabling Kerberos authentication via GSSAPI using ccache at {1}.".format(
+        alert_name, ccache_file_path))
+
+      shell.checked_call("{0} -l 5m -c {1} -kt {2} {3} > /dev/null".format(kinit_path_local, ccache_file_path, keytab, principal), user=user)
+    else:
+      logger.debug("[Alert][{0}] Kerberos authentication via GSSAPI already enabled using ccache at {1}.".format(
+        alert_name, ccache_file_path))
+  finally:
+    kinit_lock.release()
 
 
   # check if cookies dir exists, if not then create it
   # check if cookies dir exists, if not then create it
   cookies_dir = os.path.join(tmp_dir, "cookies")
   cookies_dir = os.path.join(tmp_dir, "cookies")

+ 14 - 9
ambari-common/src/main/python/resource_management/libraries/functions/hive_check.py

@@ -18,9 +18,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 '''
 '''
 
 
-import socket
-
-from resource_management.core.exceptions import Fail
+from resource_management.core import global_lock
 from resource_management.core.resources import Execute
 from resource_management.core.resources import Execute
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import format
 
 
@@ -55,15 +53,22 @@ def check_thrift_port_sasl(address, port, hive_auth="NOSASL", key=None, kinitcmd
   if ssl and ssl_keystore is not None and ssl_password is not None:
   if ssl and ssl_keystore is not None and ssl_password is not None:
     beeline_url.extend(['ssl={ssl_str}', 'sslTrustStore={ssl_keystore}', 'trustStorePassword={ssl_password!p}'])
     beeline_url.extend(['ssl={ssl_str}', 'sslTrustStore={ssl_keystore}', 'trustStorePassword={ssl_password!p}'])
 
 
-  # append url according to kerberos setting
+  # append url according to principal and execute kinit
   if kinitcmd:
   if kinitcmd:
     beeline_url.append('principal={key}')
     beeline_url.append('principal={key}')
-    Execute(kinitcmd, user=smokeuser)
+
+    # prevent concurrent kinit
+    kinit_lock = global_lock.get_lock(global_lock.LOCK_TYPE_KERBEROS)
+    kinit_lock.acquire()
+    try:
+      Execute(kinitcmd, user=smokeuser)
+    finally:
+      kinit_lock.release()
 
 
   cmd = "! beeline -u '%s' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'" % \
   cmd = "! beeline -u '%s' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'" % \
         format(";".join(beeline_url))
         format(";".join(beeline_url))
+
   Execute(cmd,
   Execute(cmd,
-          user=smokeuser,
-          path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
-          timeout=check_command_timeout
-  )
+    user=smokeuser,
+    path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
+    timeout=check_command_timeout)

+ 11 - 4
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py

@@ -24,6 +24,7 @@ import time
 import traceback
 import traceback
 import logging
 import logging
 
 
+from resource_management.core import global_lock
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.core.resources import Execute
 from resource_management.core.resources import Execute
@@ -145,13 +146,19 @@ def execute(configurations={}, parameters={}, host_name=None):
         kerberos_executable_search_paths = configurations[KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY]
         kerberos_executable_search_paths = configurations[KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY]
       else:
       else:
         kerberos_executable_search_paths = None
         kerberos_executable_search_paths = None
-             
+
       kinit_path_local = get_kinit_path(kerberos_executable_search_paths)
       kinit_path_local = get_kinit_path(kerberos_executable_search_paths)
       kinitcmd=format("{kinit_path_local} -kt {smokeuser_keytab} {smokeuser_principal}; ")
       kinitcmd=format("{kinit_path_local} -kt {smokeuser_keytab} {smokeuser_principal}; ")
 
 
-      Execute(kinitcmd, user=smokeuser,
-        path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
-        timeout=10)
+      # prevent concurrent kinit
+      kinit_lock = global_lock.get_lock(global_lock.LOCK_TYPE_KERBEROS)
+      kinit_lock.acquire()
+      try:
+        Execute(kinitcmd, user=smokeuser,
+          path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
+          timeout=10)
+      finally:
+        kinit_lock.release()
 
 
     if host_name is None:
     if host_name is None:
       host_name = socket.getfqdn()
       host_name = socket.getfqdn()

+ 0 - 6
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_webhcat_server.py

@@ -26,13 +26,7 @@ import traceback
 import logging
 import logging
 
 
 from resource_management.core.environment import Environment
 from resource_management.core.environment import Environment
-from resource_management.core.resources import Execute
-from resource_management.core import shell
-from resource_management.libraries.functions import format
-from resource_management.libraries.functions import get_kinit_path
-from resource_management.libraries.functions import get_klist_path
 from resource_management.libraries.functions.curl_krb_request import curl_krb_request
 from resource_management.libraries.functions.curl_krb_request import curl_krb_request
-from os import getpid, sep
 
 
 RESULT_CODE_OK = "OK"
 RESULT_CODE_OK = "OK"
 RESULT_CODE_CRITICAL = "CRITICAL"
 RESULT_CODE_CRITICAL = "CRITICAL"

+ 11 - 4
ambari-server/src/main/resources/common-services/OOZIE/4.0.0.2.0/package/alerts/alert_check_oozie_server.py

@@ -17,17 +17,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 """
 """
+import os
+import re
+
+from resource_management.core import global_lock
 from resource_management.core.environment import Environment
 from resource_management.core.environment import Environment
 from resource_management.core.resources import Execute
 from resource_management.core.resources import Execute
-from resource_management.core.shell import call
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.libraries.functions import get_klist_path
 from resource_management.libraries.functions import get_klist_path
 from ambari_commons.os_check import OSConst, OSCheck
 from ambari_commons.os_check import OSConst, OSCheck
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 from urlparse import urlparse
 from urlparse import urlparse
-import os
-import re
 
 
 RESULT_CODE_OK = 'OK'
 RESULT_CODE_OK = 'OK'
 RESULT_CODE_CRITICAL = 'CRITICAL'
 RESULT_CODE_CRITICAL = 'CRITICAL'
@@ -143,7 +144,13 @@ def get_check_command(oozie_url, host_name, configurations, parameters, only_kin
     else:
     else:
       kinit_command = "{0} -s {1} || ".format(klist_path_local, ccache_file) + kinit_part_command
       kinit_command = "{0} -s {1} || ".format(klist_path_local, ccache_file) + kinit_part_command
 
 
-    Execute(kinit_command, environment=kerberos_env, user=user)
+    # prevent concurrent kinit
+    kinit_lock = global_lock.get_lock(global_lock.LOCK_TYPE_KERBEROS)
+    kinit_lock.acquire()
+    try:
+      Execute(kinit_command, environment=kerberos_env, user=user)
+    finally:
+      kinit_lock.release()
 
 
   # oozie configuration directory uses a symlink when > HDP 2.2
   # oozie configuration directory uses a symlink when > HDP 2.2
   oozie_config_directory = OOZIE_CONF_DIR_LEGACY
   oozie_config_directory = OOZIE_CONF_DIR_LEGACY

+ 55 - 0
ambari-server/src/test/python/TestGlobalLock.py

@@ -0,0 +1,55 @@
+# !/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from resource_management.core import global_lock
+from resource_management.core.exceptions import Fail
+
+from unittest import TestCase
+
+utils = __import__('ambari_server.utils').utils
+
+class TestGlobalLock(TestCase):
+  def test_get_invalid_lock(self):
+    """
+    Tests that an invalid lock throws an exception
+    :return:
+    """
+    try:
+      global_lock.get_lock("INVALID")
+      self.fail("Expected an exception when trying to retrieve an invalid lock")
+    except Fail:
+      pass
+
+  def test_get_kerberos_lock(self):
+    """
+    Tests that the kerberos lock can be retrieved.
+    :return:
+    """
+    kerberos_lock = global_lock.get_lock(global_lock.LOCK_TYPE_KERBEROS)
+    self.assertFalse(kerberos_lock is None)
+
+    kerberos_lock_2 = global_lock.get_lock(global_lock.LOCK_TYPE_KERBEROS)
+    self.assertEqual(kerberos_lock, kerberos_lock_2)
+
+    kerberos_lock.acquire()
+    kerberos_lock.release()
+
+    kerberos_lock_2.acquire()
+    kerberos_lock_2.release()