Browse Source

HADOOP-2847. Ensure idle cluster cleanup works even if the JobTracker becomes unresponsive to RPC calls. Contributed by Hemanth Yamijala.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@633166 13f79535-47bb-0310-9956-ffa450edef68
Nigel Daley 17 years ago
parent
commit
9df37b8953
2 changed files with 27 additions and 1 deletions
  1. 3 0
      CHANGES.txt
  2. 24 1
      src/contrib/hod/hodlib/RingMaster/idleJobTracker.py

+ 3 - 0
CHANGES.txt

@@ -216,6 +216,9 @@ Release 0.16.1 - Unreleased
     HADOOP-2923.  Add SequenceFileAsBinaryInputFormat, which was
     HADOOP-2923.  Add SequenceFileAsBinaryInputFormat, which was
     missed in the commit for HADOOP-2603. (cdouglas via omalley)
     missed in the commit for HADOOP-2603. (cdouglas via omalley)
 
 
+    HADOOP-2847.  Ensure idle cluster cleanup works even if the JobTracker
+    becomes unresponsive to RPC calls. (Hemanth Yamijala via nigel)
+
 Release 0.16.0 - 2008-02-07
 Release 0.16.0 - 2008-02-07
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 24 - 1
src/contrib/hod/hodlib/RingMaster/idleJobTracker.py

@@ -31,6 +31,13 @@ class HadoopJobStatus:
   def getStatus(self):
   def getStatus(self):
     return self.__status
     return self.__status
 
 
+class HadoopClientException(Exception):
+  """This class represents an exception that is raised when we fail in
+     running the job client."""
+  
+  def __init__(self, errorCode):
+    self.errorCode = errorCode
+  
 class JobTrackerMonitor:
 class JobTrackerMonitor:
   """This class monitors the JobTracker of an allocated cluster
   """This class monitors the JobTracker of an allocated cluster
      periodically to detect whether it is idle. If it is found
      periodically to detect whether it is idle. If it is found
@@ -134,7 +141,17 @@ class JobTrackerMonitor:
 
 
   def __isIdle(self):
   def __isIdle(self):
     """This method checks if the JobTracker is idle beyond a certain limit."""
     """This method checks if the JobTracker is idle beyond a certain limit."""
-    if self.__getJobCount() == 0:
+    jobCount = 0
+    err = False
+
+    try:
+      jobCount = self.__getJobCount()
+    except HadoopClientException, hce:
+      self.__log.debug('HadoopClientException handled in getting job count. \
+                                      Error code: %s' % hce.errorCode)
+      err = True
+
+    if (jobCount==0) or err:
       if self.__firstIdleTime == 0:
       if self.__firstIdleTime == 0:
         #detecting idleness for the first time
         #detecting idleness for the first time
         self.__firstIdleTime = time.time()
         self.__firstIdleTime = time.time()
@@ -145,6 +162,7 @@ class JobTrackerMonitor:
     else:
     else:
       # reset idleness time
       # reset idleness time
       self.__firstIdleTime = 0
       self.__firstIdleTime = 0
+      
     return False
     return False
 
 
   def __getJobCount(self):
   def __getJobCount(self):
@@ -164,6 +182,11 @@ class JobTrackerMonitor:
         match = self.__jobCountRegExp.match(line)
         match = self.__jobCountRegExp.match(line)
         if match:
         if match:
           jobs = int(match.group(1))
           jobs = int(match.group(1))
+    elif jtStatusCommand.exit_code() == 1:
+      # for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets
+      # to differentiate and give more granular exit codes, we can check for those errors
+      # corresponding to network errors etc.
+      raise HadoopClientException(jtStatusCommand.exit_code())
     return jobs
     return jobs
 
 
   def __isCompatibleHadoopVersion(self, expectedVersion):
   def __isCompatibleHadoopVersion(self, expectedVersion):