Bläddra i källkod

Merge -r 668553:668554 from trunk onto 0.18 branch. Fixes HADOOP-3531.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@668558 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 år sedan
förälder
incheckning
c715272f27
2 ändrade filer med 50 tillägg och 24 borttagningar
  1. 3 0
      src/contrib/hod/CHANGES.txt
  2. 47 24
      src/contrib/hod/hodlib/HodRing/hodRing.py

+ 3 - 0
src/contrib/hod/CHANGES.txt

@@ -34,6 +34,9 @@ Release 0.18.0 - Unreleased
     HADOOP-3523. Fixes auto-deallocation of cluster if job id is not found in
     Torque's job list (Hemanth Yamijala via ddas)
 
+    HADOOP-3531. Fixes a bug related to handling JobTracker failures because of
+    timing issues on slow nodes. (Hemanth Yamijala via ddas)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

+ 47 - 24
src/contrib/hod/hodlib/HodRing/hodRing.py

@@ -435,31 +435,12 @@ class HadoopCommand:
       if self.__hadoopThread.exit_code() != 0:
         status = False
     else:
-      code = self.__hadoopThread.exit_code()
-      if code != 0 and code != None:
-        status = False
+      status = self.getCommandStatus()
         
     self.log.debug("hadoop run status: %s" % status)    
     
     if status == False:
-      self.log.error('hadoop error: %s' % (
-                       self.__hadoopThread.exit_status_string()))
-      # read the contents of redirected stderr to print information back to user
-      if os.path.exists(self.err):
-        f = None
-        try:
-          f = open(self.err)
-          lines = f.readlines()
-          # format
-          for line in lines:
-            self.stdErrContents = "%s%s" % (self.stdErrContents, line)
-        finally:
-          if f is not None:
-            f.close()
-      self.log.error('See %s.out and/or %s.err for details. They are ' % \
-                     (self.name, self.name) + \
-                     'located at subdirectories under either ' + \
-                     'hodring.work-dirs or hodring.log-destination-uri.')
+      self.handleFailedCommand()
    
     if (status == True) or (not desc.isIgnoreFailures()):
       return status
@@ -476,6 +457,33 @@ class HadoopCommand:
     list.extend(self.workdirs)
     list.append(self.confdir)
 
+  def getCommandStatus(self):
+    status = True
+    ec = self.__hadoopThread.exit_code()
+    if (ec != 0) and (ec != None):
+      status = False
+    return status
+
+  def handleFailedCommand(self):
+    self.log.error('hadoop error: %s' % (
+                     self.__hadoopThread.exit_status_string()))
+    # read the contents of redirected stderr to print information back to user
+    if os.path.exists(self.err):
+      f = None
+      try:
+        f = open(self.err)
+        lines = f.readlines()
+        # format
+        for line in lines:
+          self.stdErrContents = "%s%s" % (self.stdErrContents, line)
+      finally:
+        if f is not None:
+          f.close()
+    self.log.error('See %s.out and/or %s.err for details. They are ' % \
+                   (self.name, self.name) + \
+                   'located at subdirectories under either ' + \
+                   'hodring.work-dirs or hodring.log-destination-uri.')
+
 class HodRing(hodBaseService):
   """The main class for hodring that
   polls the commands it runs"""
@@ -636,9 +644,9 @@ class HodRing(hodBaseService):
 
       # ok.. now command is running. If this HodRing got jobtracker, 
       # Check if it is ready for accepting jobs, and then only return
-      self.__check_jobtracker(desc, id-1)
+      self.__check_jobtracker(desc, id-1, pkgdir)
       
-  def __check_jobtracker(self, desc, id):
+  def __check_jobtracker(self, desc, id, pkgdir):
     # Check jobtracker status. Return properly if it is ready to accept jobs.
     # Currently Checks for Jetty to come up, the last thing that can be checked
     # before JT completes initialisation. To be perfectly reliable, we need 
@@ -649,7 +657,8 @@ class HodRing(hodBaseService):
       self.log.debug("Waiting for jobtracker to initialise")
       version = desc.getVersion()
       self.log.debug("jobtracker version : %s" % version)
-      attrs = self.getRunningValues()[id].getFilledInKeyValues()
+      hadoopCmd = self.getRunningValues()[id]
+      attrs = hadoopCmd.getFilledInKeyValues()
       attrs = parseEquals(attrs)
       jobTrackerAddr = attrs['mapred.job.tracker']
       self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr)
@@ -669,6 +678,20 @@ class HodRing(hodBaseService):
       jettyStatus = False
       jettyStatusmsg = ""
       while sleepTime <= 32:
+        # There is a possibility that the command might fail after a while.
+        # This code will check if the command failed so that a better
+        # error message can be returned to the user.
+        if not hadoopCmd.getCommandStatus():
+          self.log.critical('Hadoop command found to have failed when ' \
+                            'checking for jobtracker status')
+          hadoopCmd.handleFailedCommand()
+          addnInfo = ""
+          if hadoopCmd.stdErrContents is not "":
+            addnInfo = " Information from stderr of the command:\n%s" \
+                                        % (hadoopCmd.stdErrContents)
+          raise Exception("Could not launch the %s using %s/bin/hadoop.%s" \
+                                        % (desc.getName(), pkgdir, addnInfo))
+          
         try:
           jettyConn = httplib.HTTPConnection(jettyAddr)
           jettyConn.request("HEAD", "/jobtracker.jsp")