浏览代码

HADOOP-3464. Implemented a mechanism to transfer HOD errors that occur on compute nodes to the submit node running the HOD client, so users have good feedback on why an allocation failed. (Vinod Kumar Vavilapalli via mukund)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@663331 13f79535-47bb-0310-9956-ffa450edef68
Mukund Madhugiri 17 年之前
父节点
当前提交
19fafa97ad

+ 3 - 0
src/contrib/hod/CHANGES.txt

@@ -15,6 +15,9 @@ Trunk (unreleased changes)
     HADOOP-3151. Improves error messages when reporting failures due to 
     incorrect parameters passed to HOD. (Vinod Kumar Vavilapalli via ddas)
 
+    HADOOP-3464. Implemented a mechanism to transfer HOD errors that occur on
+    compute nodes to the submit node running the HOD client, so users have good
+    feedback on why an allocation failed. (Vinod Kumar Vavilapalli via mukund)
  
   BUG FIXES
 

+ 25 - 2
src/contrib/hod/bin/hodring

@@ -46,8 +46,11 @@ sys.path.append(libDirectory)
 
 from hodlib.HodRing.hodRing import HodRing
 from hodlib.Common.setup import *
-from hodlib.Common.util import filter_warnings,get_exception_string, get_exception_error_string, getMapredSystemDirectory
+from hodlib.Common.util import filter_warnings, get_exception_string, \
+                get_exception_error_string, getMapredSystemDirectory, \
+                to_http_url, local_fqdn
 from hodlib.Common.logger import getLogger, ensureLogDir
+from hodlib.Common.xmlrpc import hodXRClient
 
 filter_warnings()
 
@@ -255,10 +258,30 @@ if __name__ == '__main__':
       log.critical("exec failed")
       os._exit(1)
 
-  except Exception:
+  except Exception, e:
     if service:
       if service.log:
         log = service.log
     else:
       log = getLogger(hodRingOptions['hodring'], 'hodring')
     log.error("Error in bin/hodring %s. \nStack trace:\n%s" %(get_exception_error_string(),get_exception_string()))
+    
+    log.info("now trying informing to ringmaster")
+    log.info(hodRingOptions['hodring']['ringmaster-xrs-addr'])
+    log.info(hodRingOptions.normalizeValue('hodring', 'ringmaster-xrs-addr'))
+    log.info(to_http_url(hodRingOptions.normalizeValue( \
+            'hodring', 'ringmaster-xrs-addr')))
+    # Report errors to the Ringmaster if possible
+    try:
+      ringXRAddress = to_http_url(hodRingOptions.normalizeValue( \
+                                     'hodring', 'ringmaster-xrs-addr'))
+      log.debug("Creating ringmaster XML-RPC client.")
+      ringClient = hodXRClient(ringXRAddress)    
+      if ringClient is not None:
+        addr = local_fqdn() + "_" + str(os.getpid())
+        ringClient.setHodRingErrors(addr, str(e))
+        log.info("Reported errors to ringmaster at %s" % ringXRAddress)
+    except Exception, e:
+      log.error("Failed to report errors to ringmaster at %s" % ringXRAddress)
+      log.error("Reason : %s" % get_exception_string())
+    # End of reporting errors to the client

+ 39 - 9
src/contrib/hod/bin/ringmaster

@@ -46,9 +46,11 @@ sys.path.append(libDirectory)
 from hodlib.RingMaster.ringMaster import main
 from hodlib.Common.setup import *
 from hodlib.Common.descGenerator import *
-from hodlib.Common.util import local_fqdn, filter_warnings
+from hodlib.Common.util import local_fqdn, filter_warnings, to_http_url, \
+                        get_exception_string, get_exception_error_string
 from hodlib.Common.logger import getLogger, ensureLogDir
-from hodlib.Common.util import get_exception_string, get_exception_error_string
+from hodlib.Common.xmlrpc import hodXRClient
+import logging
 
 filter_warnings()
 
@@ -275,9 +277,16 @@ if __name__ == '__main__':
   confDef = definition()
   confDef.add_defs(defList, defOrder)
   ringMasterOptions = options(confDef, "./%s [OPTIONS]" % myName, VERSION)
-  log = None
+  log = logging.getLogger()
 
   try:
+
+    # Set up logging before anything else.
+    ensureLogDir(ringMasterOptions.normalizeValue('ringmaster', 'log-dir'))
+    log = getLogger(ringMasterOptions['ringmaster'],'ringmaster')
+    # End of setting up logging
+
+    # Verify and process options
     statusMsgs = []
     # Conditional validation
     if not ringMasterOptions['ringmaster'].has_key('hadoop-tar-ball') or \
@@ -291,21 +300,42 @@ if __name__ == '__main__':
                                                   'gridservice-mapred', 'pkgs'))
 
     if len(statusMsgs) != 0:
-      raise Exception("%s" % statusMsgs)
+      # format status messages into a single string
+      errStr = ''
+      for msg in statusMsgs:
+        errStr = "%s%s\n" % (errStr, msg)
+      raise Exception("%s" % errStr)
     # End of conditional validation
 
     (status, statusMsgs) = ringMasterOptions.verify()
     if not status:
-      raise Exception("%s" % statusMsgs)
+      # format status messages into a single string
+      errStr = ''
+      for msg in statusMsgs:
+        errStr = "%s%s\n" % (errStr, msg)
+      raise Exception("%s" % errStr)
+
     ringMasterOptions.replace_escape_seqs()
     ringMasterOptions['ringmaster']['base-dir'] = rootDirectory 
+    # End of option processing
 
-    ensureLogDir(ringMasterOptions['ringmaster']['log-dir'])
-    log = getLogger(ringMasterOptions['ringmaster'],'ringmaster')
     ret = main(ringMasterOptions,log)
     sys.exit(ret)
   except Exception, e:
-    if log:
-      log.error("bin/ringmaster failed to start.%s. Stack trace follows:\n%s" % (get_exception_error_string(),get_exception_string()))
+    log.error("bin/ringmaster failed to start.%s. Stack trace follows:\n%s" % (get_exception_error_string(),get_exception_string()))
+
+    # Report errors to the client if possible
+    try:
+      serviceAddr = to_http_url(ringMasterOptions.normalizeValue( \
+                                     'ringmaster', 'svcrgy-addr'))
+      serviceClient = hodXRClient(serviceAddr)
+      if serviceClient is not None:
+        serviceClient.setRMError([str(e),get_exception_string()])
+        log.info("Reported errors to service registry at %s" % serviceAddr)
+    except Exception, e:
+      log.error("Failed to report errors to service registry.")
+      log.error("Reason : %s" % get_exception_string())
+    # End of reporting errors to the client
+
     # Ringmaster failing to start is a ringmaster error. Exit with the appropriate exit code.
     sys.exit(6)

+ 16 - 10
src/contrib/hod/hodlib/GridServices/service.py

@@ -65,6 +65,7 @@ class MasterSlave(Service):
     self.masterInitialized = False
     self.masterAddress = 'none'
     self.requiredNode = requiredNode
+    self.failedMsg = None
 
   def getRequiredNode(self):
     return self.requiredNode
@@ -149,6 +150,12 @@ class MasterSlave(Service):
 
   def isExternal(self):
     return self.serviceDesc.isExternal()
+
+  def setMasterFailed(self, err):
+    self.failedMsg = err
+
+  def getMasterFailed(self):
+    return self.failedMsg
   
 class NodeRequest:
   """ A class to define 
@@ -191,15 +198,16 @@ class ServiceUtil:
       if log: log.debug("Trying to see if port %s is available"% n)
       try:
         s.bind((h, n))
+        if log: log.debug("Yes, port %s is available" % n)
+        avail = True
       except socket.error,e:
         if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
         retry -= 1
         pass
-      else:
-        if log: log.debug("Yes, port %s is available" % n)
-        avail = True
-      finally:
-        s.close()
+      # The earlier code that used to be here had syntax errors. The code path
+      # couldn't be followd anytime, so the error remained uncaught.
+      # This time I stumbled upon the error
+      s.close()
 
       if avail:
         ServiceUtil.localPortUsed[n] = True
@@ -229,15 +237,13 @@ class ServiceUtil:
       if log: log.debug("Trying to see if port %s is available"% n)
       try:
         s.bind((h, n))
+        if log: log.debug("Yes, port %s is available" % n)
+        avail = True
       except socket.error,e:
         if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
         retry -= 1
         pass
-      else:
-        if log: log.debug("Yes, port %s is available" % n)
-        avail = True
-      finally:
-        s.close()
+      s.close()
 
       if avail:
         ServiceUtil.localPortUsed[n] = True

+ 12 - 2
src/contrib/hod/hodlib/Hod/hadoop.py

@@ -260,7 +260,6 @@ class hadoopCluster:
           if (count % 10 == 0):
             if not self.__check_job_status():
               break
-
     return ringmasterXRS
  
   def __init_hadoop_service(self, serviceName, xmlrpcClient):
@@ -296,6 +295,11 @@ class hadoopCluster:
       self.__log.critical("Failed to retrieve '%s' service address." % 
                           serviceName)
       status = False
+    elif serviceAddress.startswith("Error: "):
+      errs = serviceAddress[len("Error: "):]
+      self.__log.critical("Cluster could not be allocated because of the following errors.\n%s" % \
+                             errs)
+      status = False
     else:
       try:
         self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'], 
@@ -556,7 +560,7 @@ class hadoopCluster:
             else:
               status = 6
             if status != 0:
-              self.__log.info("Cleaning up cluster id %s, as cluster could not be allocated." % self.jobId)
+              self.__log.debug("Cleaning up cluster id %s, as cluster could not be allocated." % self.jobId)
               if ringClient is None:
                 self.delete_job(self.jobId)
               else:
@@ -590,6 +594,12 @@ class hadoopCluster:
         self.__log.critical("Scheduler failure, allocation failed.\n\n")        
         status = 4
     
+    if status == 5 or status == 6:
+      ringMasterErrors = self.__svcrgyClient.getRMError()
+      if ringMasterErrors:
+        self.__log.critical("Cluster could not be allocated because of the following errors on the ringmaster host.\n%s" % \
+                               (ringMasterErrors[0]))
+        self.__log.debug("Stack trace on ringmaster: %s" % ringMasterErrors[1])
     return status
 
   def __isRingMasterAlive(self, rmAddr):

+ 1 - 1
src/contrib/hod/hodlib/Hod/hod.py

@@ -186,7 +186,7 @@ class hodRunner:
   def __setup_service_registry(self):
     cfg = self.__cfg['hod'].copy()
     cfg['debug'] = 0
-    self.__registry = svcrgy(cfg)
+    self.__registry = svcrgy(cfg, self.__log)
     self.__registry.start()
     self.__log.debug(self.__registry.getXMLRPCAddr())
     self.__cfg['hod']['xrs-address'] = self.__registry.getXMLRPCAddr()

+ 18 - 2
src/contrib/hod/hodlib/HodRing/hodRing.py

@@ -234,6 +234,7 @@ class HadoopCommand:
     self._createHadoopSiteXml()
     self._createHadoopLogDir()
     self.__hadoopThread = None
+    self.stdErrContents = "" # store list of contents for returning to user
 
   def _createWorkDirs(self):
     for dir in self.workdirs:
@@ -443,6 +444,18 @@ class HadoopCommand:
     if status == False:
       self.log.error('hadoop error: %s' % (
                        self.__hadoopThread.exit_status_string()))
+      # read the contents of redirected stderr to print information back to user
+      if os.path.exists(self.err):
+        f = None
+        try:
+          f = open(self.err)
+          lines = f.readlines()
+          # format
+          for line in lines:
+            self.stdErrContents = "%s%s" % (self.stdErrContents, line)
+        finally:
+          if f is not None:
+            f.close()
       self.log.error('See %s.out and/or %s.err for details. They are ' % \
                      (self.name, self.name) + \
                      'located at subdirectories under either ' + \
@@ -607,9 +620,12 @@ class HodRing(hodBaseService):
 
         self.log.debug('This is the packcage dir %s ' % (pkgdir))
         if not cmd.run(pkgdir):
-          raise ValueError, "Can't launch command: %s" % pkgdir
+          addnInfo = ""
+          if cmd.stdErrContents is not "":
+            addnInfo = " Information from stderr of the command:\n%s" % (cmd.stdErrContents)
+          raise Exception("Could not launch the %s using %s/bin/hadoop.%s" % (desc.getName(), pkgdir, addnInfo))
       except Exception, e:
-        print get_exception_string()
+        self.log.debug("Exception running hadoop command: %s\n%s" % (get_exception_error_string(), get_exception_string()))
         self.__running[id] = cmd
         raise Exception(e)
 

+ 8 - 5
src/contrib/hod/hodlib/RingMaster/idleJobTracker.py

@@ -85,17 +85,14 @@ class JobTrackerMonitor:
   def getJobTrackerURL(self):
     """This method periodically checks the service info provider for the JT URL"""
     self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
-    while not self.__stopFlag and \
-          (self.__jobTrackerURL is None or \
-            self.__jobTrackerURL == 'not found'):
+    while not self.__stopFlag and not self.__isValidJobTrackerURL():
       time.sleep(10)
       if not self.__stopFlag:
         self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
       else:
         break
 
-    if (self.__jobTrackerURL != None) and \
-          (self.__jobTrackerURL != 'not found'):
+    if self.__isValidJobTrackerURL():
       self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL)
       self.__jtMonitorThread.start()
 
@@ -129,6 +126,12 @@ class JobTrackerMonitor:
       self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
     return jobStatusList
 
+  def __isValidJobTrackerURL(self):
+    """This method checks that the passed in URL is not one of the special case strings
+       returned by the getServiceAddr API"""
+    return ((self.__jobTrackerURL != None) and (self.__jobTrackerURL != 'not found') \
+              and (not self.__jobTrackerURL.startswith('Error')))
+
   def __extractJobStatus(self, line):
     """This method parses an output line from the job status command and creates
        the JobStatus object if there is a match"""

+ 34 - 6
src/contrib/hod/hodlib/RingMaster/ringMaster.py

@@ -421,7 +421,7 @@ class _LogMasterSources:
     hodring to update any parameters
     its changed for the commands it was
     running"""
-    self.log.debug('Comment: adding master params')
+    self.log.debug('Comment: adding master params from %s' % addr)
     self.log.debug(pformat(vals))
     lock = self.masterParamLock
     lock.acquire()
@@ -431,7 +431,6 @@ class _LogMasterSources:
           if (v.getMasterAddress() == addr):
             v.setMasterParams(vals)
             v.setMasterInitialized()
-
     except:
       self.log.debug(get_exception_string())
       pass
@@ -439,6 +438,27 @@ class _LogMasterSources:
             
     return addr
 
+  def setHodRingErrors(self, addr, errors):
+    """This method is called by the hodrings to update errors 
+      it encountered while starting up"""
+    self.log.critical("Hodring at %s failed with following errors:\n%s" % (addr, errors))
+    lock = self.masterParamLock
+    lock.acquire()
+    try:
+      for v in self.serviceDict.itervalues():
+        if v.isMasterLaunched():
+          if (v.getMasterAddress() == addr):
+            # strip the PID part.
+            idx = addr.rfind('_')
+            if idx is not -1:
+              addr = addr[:idx]
+            v.setMasterFailed("Hodring at %s failed with following errors:\n%s" % (addr, errors))
+    except:
+      self.log.debug(get_exception_string())
+      pass
+    lock.release()
+    return True
+
   def getKeys(self):
     lock= self.masterParamLock
     lock.acquire()
@@ -458,7 +478,10 @@ class _LogMasterSources:
       pass
     else:
       self.log.debug("getServiceAddr service: %s" % service)
-      if (service.isMasterInitialized()):
+      err = service.getMasterFailed()
+      if err is not None:
+        addr = "Error: " + err
+      elif (service.isMasterInitialized()):
         addr = service.getMasterAddrs()[0]
       else:
         addr = 'not found'
@@ -563,7 +586,10 @@ class RingMaster:
       # Determine hadoop Version
       hadoopVers = hadoopVersion(self.__getHadoopDir(), \
                                 self.cfg['hodring']['java-home'], self.log)
-      
+     
+      if (hadoopVers['major']==None) or (hadoopVers['minor']==None):
+        raise Exception('Could not retrive the version of Hadoop.'
+                        + ' Check the Hadoop installation or the value of the hodring.java-home variable.')
       if hdfsDesc.isExternal():
         hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
         hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
@@ -888,9 +914,11 @@ class RingMaster:
   def __findExitCode(self):
     """Determine the exit code based on the status of the cluster or jobs run on them"""
     xmlrpcServer = ringMasterServer.instance.logMasterSources
-    if xmlrpcServer.getServiceAddr('hdfs') == 'not found':
+    if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or \
+        xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "):
       self.__exitCode = 7
-    elif xmlrpcServer.getServiceAddr('mapred') == 'not found':
+    elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or \
+        xmlrpcServer.getServiceAddr('mapred').startswith("Error: "):
       self.__exitCode = 8
     else:
       clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),

+ 20 - 2
src/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py

@@ -19,9 +19,10 @@ from hodlib.Common.hodsvc import hodBaseService
 from hodlib.Common.threads import loop
 from hodlib.Common.tcp import tcpSocket
 from hodlib.Common.util import get_exception_string
+import logging
 
 class svcrgy(hodBaseService):
-    def __init__(self, config):
+    def __init__(self, config, log=None):
         hodBaseService.__init__(self, 'serviceRegistry', config)
         
         self.__serviceDict = {}
@@ -30,6 +31,10 @@ class svcrgy(hodBaseService):
         self.__locked = {}
         
         self.__serviceDictLock = threading.Lock()
+        self.RMErrorMsgs = None # Ringmaster error messages
+        self.log = log
+        if self.log is None:
+          self.log = logging.getLogger()
     
     def __get_job_key(self, userid, job):
         return "%s-%s" % (userid, job)
@@ -40,7 +45,20 @@ class svcrgy(hodBaseService):
     def _xr_method_getServiceInfo(self, userid=None, job=None, name=None, 
                                   type=None):
         return self.getServiceInfo(userid, job, name, type)
-        
+
+    def _xr_method_setRMError(self, args):
+        self.log.debug("setRMError called with %s" % args)
+        self.RMErrorMsgs = args
+        return True
+
+    def _xr_method_getRMError(self):
+        self.log.debug("getRMError called")
+        if self.RMErrorMsgs is not None:
+          return self.RMErrorMsgs
+        else:
+          self.log.debug("no Ringmaster error messages")
+          return False
+
     def registerService(self, userid, job, host, name, type, dict):
         """Method thats called upon by
         the ringmaster to register to the

+ 2 - 2
src/contrib/hod/testing/testHod.py

@@ -131,7 +131,7 @@ class test_InvalidArgsOperations(unittest.TestCase):
   def testInfoNonExistentDirectory(self):
     clusterDir = '/tmp/hod/testInfoNonExistentDirectory'
     self.client._op_info(['info', clusterDir])
-    self.assertTrue(self.log.hasMessage("'%s' is not a valid cluster directory." % (clusterDir), 'critical'))
+    self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
 
   # Test that deallocation works on a deleted cluster directory
   # by clearing the job, and removing the state
@@ -174,7 +174,7 @@ class test_InvalidArgsOperations(unittest.TestCase):
     self.client._op_deallocate(['deallocate', clusterDir])
     # there should be no call..
     self.assertFalse(self.cluster.wasOperationPerformed('delete_job', None))
-    self.assertTrue(self.log.hasMessage("'%s' is not a valid cluster directory." % (clusterDir), 'critical'))
+    self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
 
   # Test that allocation on an previously deleted directory fails.    
   def testAllocateOnDeletedDirectory(self):