Forráskód Böngészése

AMBARI-14286. Express Upgrade: Failure while trying to restart Namenode with socket timeout error (Dmytro Grinenko via ncole)

Nate Cole 9 éve
szülő
commit
133a71d956

+ 8 - 2
ambari-common/src/main/python/ambari_commons/exceptions.py

@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -16,7 +16,8 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
+
 
 class FatalException(Exception):
     def __init__(self, code, reason):
@@ -33,3 +34,8 @@ class NonFatalException(Exception):
 
   def __str__(self):
     return repr("NonFatal exception: %s" % self.reason)
+
+
+class TimeoutError(Exception):
+  def __str__(self):
+    return repr("Timeout error: %s" % self.message)

+ 23 - 1
ambari-common/src/main/python/ambari_commons/inet_utils.py

@@ -21,11 +21,33 @@ limitations under the License.
 import os
 import sys
 import urllib2
+import socket
+
+from exceptions import FatalException, NonFatalException, TimeoutError
 
-from exceptions import *
 from logging_utils import *
 from os_check import OSCheck
 
+
+def openurl(url, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, *args, **kwargs):
+  """
+
+  :param url: url to open
+  :param timeout: open timeout, raise TimeoutError on timeout
+  :rtype urllib2.Request
+  """
+  try:
+    return urllib2.urlopen(url, timeout=timeout, *args, **kwargs)
+  except urllib2.URLError as e:
+    # Python 2.6 timeout handling
+    if hasattr(e, "reason") and isinstance(e.reason, socket.timeout):
+      raise TimeoutError(e.reason)
+    else:
+      raise e  # re-throw exception
+  except socket.timeout as e:  # Python 2.7 timeout handling
+    raise TimeoutError(e)
+
+
 def download_file(link, destination, chunk_size=16 * 1024, progress_func = None):
   print_info_msg("Downloading {0} to {1}".format(link, destination))
   if os.path.exists(destination):

+ 39 - 1
ambari-common/src/main/python/resource_management/libraries/functions/decorator.py

@@ -21,7 +21,7 @@ Ambari Agent
 """
 
 import time
-__all__ = ['retry', ]
+__all__ = ['retry', 'safe_retry', ]
 
 from resource_management.core.logger import Logger
 
@@ -56,3 +56,41 @@ def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=E
     return wrapper
   return decorator
 
+
+def safe_retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception, return_on_fail=None):
+  """
+  Retry decorator for improved robustness of functions. Instead of error generation on the last try, will return
+  return_on_fail value.
+  :param times: Number of times to attempt to call the function.
+  :param sleep_time: Initial sleep time between attempts
+  :param backoff_factor: After every failed attempt, multiple the previous sleep time by this factor.
+  :param err_class: Exception class to handle
+  :param return_on_fail value to return on the last try
+  :return: Returns the output of the wrapped function.
+  """
+  def decorator(function):
+    def wrapper(*args, **kwargs):
+      _times = times
+      _sleep_time = sleep_time
+      _backoff_factor = backoff_factor
+      _err_class = err_class
+      _return_on_fail = return_on_fail
+
+      while _times > 1:
+        _times -= 1
+        try:
+          return function(*args, **kwargs)
+        except _err_class, err:
+          Logger.info("Will retry %d time(s), caught exception: %s. Sleeping for %d sec(s)" % (_times, str(err), _sleep_time))
+          time.sleep(_sleep_time)
+        if(_sleep_time * _backoff_factor <= max_sleep_time):
+          _sleep_time *= _backoff_factor
+
+      try:
+        return function(*args, **kwargs)
+      except _err_class, err:
+        Logger.error(str(err))
+        return _return_on_fail
+
+    return wrapper
+  return decorator

+ 79 - 52
ambari-common/src/main/python/resource_management/libraries/functions/ranger_functions.py

@@ -17,15 +17,25 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+
+import re
 import time
 import sys
+import urllib2
+import base64
+import httplib
+
+# simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+import ambari_simplejson as json
+
 from StringIO import StringIO as BytesIO
-import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
 from resource_management.core.logger import Logger
-import urllib2, base64, httplib
+from ambari_commons.inet_utils import openurl
+from ambari_commons.exceptions import TimeoutError
 from resource_management.core.exceptions import Fail
 from resource_management.libraries.functions.format import format
-import re
+from resource_management.libraries.functions.decorator import safe_retry
+
 
 class Rangeradmin:
   sInstance = None
@@ -46,13 +56,14 @@ class Rangeradmin:
     if self.skip_if_rangeradmin_down:
       Logger.info("Rangeradmin: Skip ranger admin if it's down !")
 
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def get_repository_by_name_urllib2(self, name, component, status, usernamepassword):
     """
-    param name: name of the component, from which, function will search in list of repositories
-    param component: component for which repository has to be checked
-    param status: active or inactive
-    param usernamepassword: user credentials using which repository needs to be searched
-    return: Returns Ranger repository dict if found otherwise None
+    :param name: name of the component, from which, function will search in list of repositories
+    :param component: component for which repository has to be checked
+    :param status: active or inactive
+    :param usernamepassword: user credentials using which repository needs to be searched
+    :return Returns Ranger repository dict if found otherwise None
     """
     try:
       searchRepoURL = self.urlReposPub + "?name=" + name + "&type=" + component + "&status=" + status
@@ -61,7 +72,7 @@ class Rangeradmin:
       request.add_header("Content-Type", "application/json")
       request.add_header("Accept", "application/json")
       request.add_header("Authorization", "Basic {0}".format(base64string))
-      result = urllib2.urlopen(request, timeout=20)
+      result = openurl(request, timeout=20)
       response_code = result.getcode()
       response = json.loads(result.read())
       if response_code == 200 and len(response['vXRepositories']) > 0:
@@ -81,6 +92,8 @@ class Rangeradmin:
     except httplib.BadStatusLine:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")
     
     
     
@@ -88,14 +101,14 @@ class Rangeradmin:
                                ambari_ranger_admin, ambari_ranger_password,
                                admin_uname, admin_password, policy_user):
     """
-    param component: name of the component, from which it will get or create repository
-    param repo_name: name of the repository to be get or create
-    param repo_properties: dict of repository to be create if not exist
-    param ambari_ranger_admin: ambari admin user creation username
-    param ambari_ranger_password: ambari admin user creation password
-    param admin_uname: ranger admin username
-    param admin_password: ranger admin password
-    param policy_user: use this policy user for policies that will be used during repository creation
+    :param component: name of the component, from which it will get or create repository
+    :param repo_name: name of the repository to be get or create
+    :param repo_properties: dict of repository to be create if not exist
+    :param ambari_ranger_admin: ambari admin user creation username
+    :param ambari_ranger_password: ambari admin user creation password
+    :param admin_uname: ranger admin username
+    :param admin_password: ranger admin password
+    :param policy_user: use this policy user for policies that will be used during repository creation
     """
     response_code = self.check_ranger_login_urllib2(self.baseUrl)
     repo_data = json.dumps(repo_properties)
@@ -128,14 +141,15 @@ class Rangeradmin:
       else:
         Logger.error('Ambari admin user creation failed')
     elif not self.skip_if_rangeradmin_down:
-      Logger.error("Connection failed to Ranger Admin !")
-          
+      Logger.error("Connection to Ranger Admin failed !")
+
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def create_repository_urllib2(self, data, usernamepassword, policy_user):
     """
-    param data: repository dict 
-    param usernamepassword: user credentials using which repository needs to be created
-    param policy_user: use this policy user for policies that will be used during repository creation
-    return: Returns created repository response else None
+    :param data: repository dict
+    :param usernamepassword: user credentials using which repository needs to be created
+    :param policy_user: use this policy user for policies that will be used during repository creation
+    :return Returns created repository response else None
     """
     try:
       searchRepoURL = self.urlReposPub
@@ -146,7 +160,7 @@ class Rangeradmin:
       }
       request = urllib2.Request(searchRepoURL, data, headers)
       request.add_header("Authorization", "Basic {0}".format(base64string))
-      result = urllib2.urlopen(request, timeout=20)
+      result = openurl(request, timeout=20)
       response_code = result.getcode()
       response = json.loads(json.JSONEncoder().encode(result.read()))
       if response_code == 200:
@@ -190,33 +204,39 @@ class Rangeradmin:
     except httplib.BadStatusLine:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")
 
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def check_ranger_login_urllib2(self, url):
     """
-    param url: ranger admin host url
-    return: Returns login check response 
+    :param url: ranger admin host url
+    :return Returns login check response
     """
     try:
-      response = urllib2.urlopen(url, timeout=20)
+      response = openurl(url, timeout=20)
       response_code = response.getcode()
       return response_code
     except urllib2.URLError, e:
       if isinstance(e, urllib2.HTTPError):
-        Logger.error("Connection failed to Ranger Admin. Http status code - {0}. \n {1}".format(e.code, e.read()))
+        Logger.error("Connection to Ranger Admin failed. Http status code - {0}. \n {1}".format(e.code, e.read()))
       else:
-        Logger.error("Connection failed to Ranger Admin. Reason - {0}.".format(e.reason))
+        Logger.error("Connection to Ranger Admin failed. Reason - {0}.".format(e.reason))
       return None
     except httplib.BadStatusLine, e:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")
 
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def get_policy_by_repo_name(self, name, component, status, usernamepassword):
     """
-    param name: repository name
-    param component: component name for which policy needs to be searched
-    param status: true or false
-    param usernamepassword: user credentials using which policy needs to be searched
-    return: Returns successful response else None
+    :param name: repository name
+    :param component: component name for which policy needs to be searched
+    :param status: true or false
+    :param usernamepassword: user credentials using which policy needs to be searched
+    :return Returns successful response else None
     """
     try:
       searchPolicyURL = self.urlPolicies + "?repositoryName=" + name + "&repositoryType=" + component + "&isEnabled=" + status
@@ -225,7 +245,7 @@ class Rangeradmin:
       request.add_header("Content-Type", "application/json")
       request.add_header("Accept", "application/json")
       request.add_header("Authorization", "Basic {0}".format(base64string))
-      result = urllib2.urlopen(request, timeout=20)
+      result = openurl(request, timeout=20)
       response_code = result.getcode()
       response = json.loads(result.read())
       if response_code == 200 and len(response['vXPolicies']) > 0:
@@ -241,13 +261,16 @@ class Rangeradmin:
     except httplib.BadStatusLine:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")
 
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def update_ranger_policy(self, policyId, data, usernamepassword):
     """
-    param policyId: policy id which needs to be updated
-    param data: policy data that needs to be updated
-    param usernamepassword: user credentials using which policy needs to be updated
-    return: Returns successful response and response code else None
+    :param policyId: policy id which needs to be updated
+    :param data: policy data that needs to be updated
+    :param usernamepassword: user credentials using which policy needs to be updated
+    :return Returns successful response and response code else None
     """
     try:
       searchRepoURL = self.urlPolicies + "/" + str(policyId)
@@ -259,7 +282,7 @@ class Rangeradmin:
       request = urllib2.Request(searchRepoURL, data, headers)
       request.add_header("Authorization", "Basic {0}".format(base64string))
       request.get_method = lambda: 'PUT'
-      result = urllib2.urlopen(request, timeout=20)
+      result = openurl(request, timeout=20)
       response_code = result.getcode()
       response = json.loads(json.JSONEncoder().encode(result.read()))
       if response_code == 200:
@@ -277,13 +300,15 @@ class Rangeradmin:
     except httplib.BadStatusLine:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None, None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")
 
   def get_policy_params(self, typeOfPolicy, policyObj, policy_user):
     """
-    param typeOfPolicy: component name for which policy has to be get
-    param policyObj: policy dict
-    param policy_user: policy user that needs to be updated
-    returns: Returns updated policy dict
+    :param typeOfPolicy: component name for which policy has to be get
+    :param policyObj: policy dict
+    :param policy_user: policy user that needs to be updated
+    :returns Returns updated policy dict
     """    
     typeOfPolicy = typeOfPolicy.lower()
     if typeOfPolicy == "hdfs":
@@ -304,12 +329,12 @@ class Rangeradmin:
                                                 'getTopologyInfo', 'uploadNewCredential', 'admin']}]
     return policyObj
 
-
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def create_ambari_admin_user(self,ambari_admin_username, ambari_admin_password,usernamepassword):
     """
-    param ambari_admin_username: username of user to be created 
-    param ambari_admin_username: user password of user to be created 
-    return: Returns response code for successful user creation else None
+    :param ambari_admin_username: username of user to be created
+    :param ambari_admin_username: user password of user to be created
+    :return Returns response code for successful user creation else None
     """
     flag_ambari_admin_present = False
     match = re.match('[a-zA-Z0-9_\S]+$', ambari_admin_password)
@@ -322,7 +347,7 @@ class Rangeradmin:
       request.add_header("Content-Type", "application/json")
       request.add_header("Accept", "application/json")
       request.add_header("Authorization", "Basic {0}".format(base64string))
-      result = urllib2.urlopen(request, timeout=20)
+      result = openurl(request, timeout=20)
       response_code = result.getcode()
       response = json.loads(result.read())
       if response_code == 200 and len(response['vXUsers']) >= 0:
@@ -346,7 +371,7 @@ class Rangeradmin:
           admin_user['password'] = ambari_admin_password
           admin_user['description'] = ambari_admin_username
           admin_user['firstName'] = ambari_admin_username
-          data =  json.dumps(admin_user)
+          data = json.dumps(admin_user)
           base64string = base64.encodestring('{0}'.format(usernamepassword)).replace('\n', '')
           headers = {
             'Accept': 'application/json',
@@ -354,8 +379,8 @@ class Rangeradmin:
           }
           request = urllib2.Request(url, data, headers)
           request.add_header("Authorization", "Basic {0}".format(base64string))
-          result = urllib2.urlopen(request, timeout=20)
-          response_code =  result.getcode()
+          result = openurl(request, timeout=20)
+          response_code = result.getcode()
           response = json.loads(json.JSONEncoder().encode(result.read()))
           if response_code == 200 and response is not None:
             Logger.info('Ambari admin user creation successful.')
@@ -375,3 +400,5 @@ class Rangeradmin:
     except httplib.BadStatusLine:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")

+ 34 - 12
ambari-common/src/main/python/resource_management/libraries/functions/ranger_functions_v2.py

@@ -17,15 +17,25 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+
+import re
 import time
 import sys
+import urllib2
+import base64
+import httplib
+
+# simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+import ambari_simplejson as json
+
 from StringIO import StringIO as BytesIO
-import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+from ambari_commons.inet_utils import openurl
 from resource_management.core.logger import Logger
-import urllib2, base64, httplib
+from ambari_commons.exceptions import TimeoutError
 from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions.decorator import safe_retry
 from resource_management.libraries.functions.format import format
-import re
+
 
 class RangeradminV2:
   sInstance = None
@@ -45,6 +55,7 @@ class RangeradminV2:
     if self.skip_if_rangeradmin_down:
       Logger.info("RangeradminV2: Skip ranger admin if it's down !")
 
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def get_repository_by_name_urllib2(self, name, component, status, usernamepassword):
     """
     :param name: name of the component, from which, function will search in list of repositories
@@ -60,7 +71,7 @@ class RangeradminV2:
       request.add_header("Content-Type", "application/json")
       request.add_header("Accept", "application/json")
       request.add_header("Authorization", "Basic {0}".format(base_64_string))
-      result = urllib2.urlopen(request, timeout=20)
+      result = openurl(request, timeout=20)
       response_code = result.getcode()
       response = json.loads(result.read())
       if response_code == 200 and len(response) > 0:
@@ -80,6 +91,8 @@ class RangeradminV2:
     except httplib.BadStatusLine:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")
       
     
   def create_ranger_repository(self, component, repo_name, repo_properties, 
@@ -119,7 +132,7 @@ class RangeradminV2:
     elif not self.skip_if_rangeradmin_down:
       Logger.error("Connection failed to Ranger Admin !")
 
-          
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def create_repository_urllib2(self, data, usernamepassword):
     """
     :param data: json object to create repository
@@ -135,7 +148,7 @@ class RangeradminV2:
       }
       request = urllib2.Request(search_repo_url, data, headers)
       request.add_header("Authorization", "Basic {0}".format(base_64_string))
-      result = urllib2.urlopen(request, timeout=20)
+      result = openurl(request, timeout=20)
       response_code = result.getcode()
       response = json.loads(json.JSONEncoder().encode(result.read()))
 
@@ -154,7 +167,10 @@ class RangeradminV2:
     except httplib.BadStatusLine:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")
 
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
   def check_ranger_login_urllib2(self, url):
     """
     :param url: ranger admin host url
@@ -162,7 +178,7 @@ class RangeradminV2:
     :return: Returns login check response 
     """
     try:
-      response = urllib2.urlopen(url, timeout=20)
+      response = openurl(url, timeout=20)
       response_code = response.getcode()
       return response_code
     except urllib2.URLError, e:
@@ -174,11 +190,15 @@ class RangeradminV2:
     except httplib.BadStatusLine, e:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection failed to Ranger Admin. Reason - timeout")
 
-  def create_ambari_admin_user(self,ambari_admin_username, ambari_admin_password,usernamepassword):
+  @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
+  def create_ambari_admin_user(self, ambari_admin_username, ambari_admin_password, usernamepassword):
     """
     :param ambari_admin_username: username of user to be created 
-    :param ambari_admin_username: user password of user to be created 
+    :param ambari_admin_password: user password of user to be created
+    :param usernamepassword: user credentials using which repository needs to be searched.
     :return: Returns user credentials if user exist otherwise rerutns credentials of  created user.
     """
     flag_ambari_admin_present = False
@@ -192,7 +212,7 @@ class RangeradminV2:
       request.add_header("Content-Type", "application/json")
       request.add_header("Accept", "application/json")
       request.add_header("Authorization", "Basic {0}".format(base_64_string))
-      result = urllib2.urlopen(request, timeout=20)
+      result = openurl(request, timeout=20)
       response_code = result.getcode()
       response = json.loads(result.read())
       if response_code == 200 and len(response['vXUsers']) >= 0:
@@ -224,8 +244,8 @@ class RangeradminV2:
           }
           request = urllib2.Request(url, data, headers)
           request.add_header("Authorization", "Basic {0}".format(base_64_string))
-          result = urllib2.urlopen(request, timeout=20)
-          response_code =  result.getcode()
+          result = openurl(request, timeout=20)
+          response_code = result.getcode()
           response = json.loads(json.JSONEncoder().encode(result.read()))
           if response_code == 200 and response is not None:
             Logger.info('Ambari admin user creation successful.')
@@ -245,3 +265,5 @@ class RangeradminV2:
     except httplib.BadStatusLine:
       Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
       return None
+    except TimeoutError:
+      raise Fail("Connection to Ranger Admin failed. Reason - timeout")